1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "cloud/asset_operation_utils.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "mock_asset_loader.h"
23 #include "process_system_api_adapter_impl.h"
24 #include "relational_store_client.h"
25 #include "relational_store_delegate_impl.h"
26 #include "relational_store_instance.h"
27 #include "relational_store_manager.h"
28 #include "runtime_config.h"
29 #include "sqlite_relational_store.h"
30 #include "sqlite_relational_utils.h"
31 #include "time_helper.h"
32 #include "virtual_asset_loader.h"
33 #include "virtual_cloud_data_translate.h"
34 #include "virtual_cloud_db.h"
35 #include "virtual_communicator_aggregator.h"
36 #include <gtest/gtest.h>
37 #include <iostream>
38
39 using namespace testing::ext;
40 using namespace DistributedDB;
41 using namespace DistributedDBUnitTest;
42 using namespace std;
43
44 namespace {
45 const string STORE_ID = "Relational_Store_SYNC";
46 const string DB_SUFFIX = ".db";
47 const string ASSETS_TABLE_NAME = "student";
48 const string ASSETS_TABLE_NAME_SHARED = "student_shared";
49 const string NO_PRIMARY_TABLE = "teacher";
50 const string NO_PRIMARY_TABLE_SHARED = "teacher_shared";
51 const string COMPOUND_PRIMARY_TABLE = "worker1";
52 const string COMPOUND_PRIMARY_TABLE_SHARED = "worker1_shared";
53 const string DEVICE_CLOUD = "cloud_dev";
54 const string COL_ID = "id";
55 const string COL_NAME = "name";
56 const string COL_HEIGHT = "height";
57 const string COL_ASSET = "asset";
58 const string COL_ASSETS = "assets";
59 const string COL_AGE = "age";
60 const int64_t SYNC_WAIT_TIME = 600;
61 const int64_t COMPENSATED_SYNC_WAIT_TIME = 5;
62 const std::vector<Field> CLOUD_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true}, {COL_NAME, TYPE_INDEX<std::string>},
63 {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
64 {COL_AGE, TYPE_INDEX<int64_t>}};
65 const std::vector<Field> NO_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>}, {COL_NAME, TYPE_INDEX<std::string>},
66 {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
67 {COL_AGE, TYPE_INDEX<int64_t>}};
68 const std::vector<Field> COMPOUND_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true},
69 {COL_NAME, TYPE_INDEX<std::string>}, {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>},
70 {COL_ASSETS, TYPE_INDEX<Assets>}, {COL_AGE, TYPE_INDEX<int64_t>, true}};
71 const string CREATE_SINGLE_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + ASSETS_TABLE_NAME + "(" + COL_ID +
72 " INTEGER PRIMARY KEY," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," +
73 COL_ASSETS + " ASSETS," + COL_AGE + " INT);";
74 const string CREATE_NO_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + NO_PRIMARY_TABLE + "(" + COL_ID +
75 " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS +
76 " ASSETS," + COL_AGE + " INT);";
77 const string CREATE_COMPOUND_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + COMPOUND_PRIMARY_TABLE + "(" + COL_ID +
78 " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS + " ASSETS," +
79 COL_AGE + " INT, PRIMARY KEY (id, age));";
80 const Asset ASSET_COPY = {.version = 1,
81 .name = "Phone",
82 .assetId = "0",
83 .subpath = "/local/sync",
84 .uri = "/local/sync",
85 .modifyTime = "123456",
86 .createTime = "",
87 .size = "256",
88 .hash = "ASE"};
89 const Asset ASSET_COPY2 = {.version = 1,
90 .name = "Phone_copy_2",
91 .assetId = "0",
92 .subpath = "/local/sync",
93 .uri = "/local/sync",
94 .modifyTime = "123456",
95 .createTime = "",
96 .size = "256",
97 .hash = "ASE"};
98 const Assets ASSETS_COPY1 = { ASSET_COPY, ASSET_COPY2 };
99 const std::string QUERY_CONSISTENT_SQL = "select count(*) from naturalbase_rdb_aux_student_log where flag&0x20=0;";
100 const std::string QUERY_COMPENSATED_SQL = "select count(*) from naturalbase_rdb_aux_student_log where flag&0x10!=0;";
101
102 string g_storePath;
103 string g_testDir;
104 RelationalStoreObserverUnitTest *g_observer = nullptr;
105 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
106 RelationalStoreDelegate *g_delegate = nullptr;
107 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
108 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
109 std::shared_ptr<VirtualCloudDataTranslate> g_virtualCloudDataTranslate;
110 SyncProcess g_syncProcess;
111 std::condition_variable g_processCondition;
112 std::mutex g_processMutex;
113 IRelationalStore *g_store = nullptr;
114 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
115 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
116
InitDatabase(sqlite3 * & db)117 void InitDatabase(sqlite3 *&db)
118 {
119 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
120 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_SINGLE_PRIMARY_KEY_TABLE), SQLITE_OK);
121 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_NO_PRIMARY_KEY_TABLE), SQLITE_OK);
122 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_COMPOUND_PRIMARY_KEY_TABLE), SQLITE_OK);
123 }
124
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)125 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
126 {
127 TableSchema assetsTableSchema = {.name = ASSETS_TABLE_NAME, .sharedTableName = ASSETS_TABLE_NAME_SHARED,
128 .fields = CLOUD_FIELDS};
129 dataBaseSchema.tables.push_back(assetsTableSchema);
130 assetsTableSchema = {.name = NO_PRIMARY_TABLE, .sharedTableName = NO_PRIMARY_TABLE_SHARED,
131 .fields = NO_PRIMARY_FIELDS};
132 dataBaseSchema.tables.push_back(assetsTableSchema);
133 assetsTableSchema = {.name = COMPOUND_PRIMARY_TABLE, .sharedTableName = COMPOUND_PRIMARY_TABLE_SHARED,
134 .fields = COMPOUND_PRIMARY_FIELDS};
135 dataBaseSchema.tables.push_back(assetsTableSchema);
136 }
137
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,std::vector<VBucket> & record,std::vector<VBucket> & extend)138 void GenerateDataRecords(
139 int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend)
140 {
141 for (int64_t i = begin; i < begin + count; i++) {
142 Assets assets;
143 Asset asset = ASSET_COPY;
144 asset.name = ASSET_COPY.name + std::to_string(i);
145 assets.emplace_back(asset);
146 asset.name = ASSET_COPY.name + std::to_string(i) + "_copy";
147 assets.emplace_back(asset);
148 VBucket data;
149 data.insert_or_assign(COL_ID, i);
150 data.insert_or_assign(COL_NAME, "name" + std::to_string(i));
151 data.insert_or_assign(COL_HEIGHT, 166.0 * i); // 166.0 is random double value
152 data.insert_or_assign(COL_ASSETS, assets);
153 data.insert_or_assign(COL_AGE, 18L + i); // 18 is random int value
154 record.push_back(data);
155
156 VBucket log;
157 Timestamp now = TimeHelper::GetSysCurrentTime();
158 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
159 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
160 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
161 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
162 extend.push_back(log);
163 }
164 }
165
InsertLocalData(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName,bool isAssetNull=true)166 void InsertLocalData(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName, bool isAssetNull = true)
167 {
168 int errCode;
169 std::vector<VBucket> record;
170 std::vector<VBucket> extend;
171 GenerateDataRecords(begin, count, 0, record, extend);
172 const string sql = "insert or replace into " + tableName + " values (?,?,?,?,?,?);";
173 for (VBucket vBucket : record) {
174 sqlite3_stmt *stmt = nullptr;
175 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
176 ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, std::get<int64_t>(vBucket[COL_ID])), E_OK); // 1 is id
177 ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, std::get<string>(vBucket[COL_NAME])), E_OK); // 2 is name
178 ASSERT_EQ(SQLiteUtils::MapSQLiteErrno(
179 sqlite3_bind_double(stmt, 3, std::get<double>(vBucket[COL_HEIGHT]))), E_OK); // 3 is height
180 if (isAssetNull) {
181 ASSERT_EQ(sqlite3_bind_null(stmt, 4), SQLITE_OK); // 4 is asset
182 } else {
183 std::vector<uint8_t> assetBlob = g_virtualCloudDataTranslate->AssetToBlob(ASSET_COPY);
184 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 4, assetBlob, false), E_OK); // 4 is asset
185 }
186 std::vector<uint8_t> assetsBlob = g_virtualCloudDataTranslate->AssetsToBlob(
187 std::get<Assets>(vBucket[COL_ASSETS]));
188 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 5, assetsBlob, false), E_OK); // 5 is assets
189 ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 6, std::get<int64_t>(vBucket[COL_AGE])), E_OK); // 6 is age
190 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
191 SQLiteUtils::ResetStatement(stmt, true, errCode);
192 }
193 }
194
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets,bool isEmptyAssets=false)195 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets, bool isEmptyAssets = false)
196 {
197 int errCode;
198 std::vector<uint8_t> assetBlob;
199 const string sql = "update " + tableName + " set assets=?;";
200 sqlite3_stmt *stmt = nullptr;
201 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
202 if (isEmptyAssets) {
203 ASSERT_EQ(sqlite3_bind_null(stmt, 1), SQLITE_OK);
204 } else {
205 assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
206 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
207 }
208 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
209 SQLiteUtils::ResetStatement(stmt, true, errCode);
210 }
211
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets,int32_t begin,int32_t end)212 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets, int32_t begin, int32_t end)
213 {
214 int errCode;
215 std::vector<uint8_t> assetBlob;
216 const string sql = "update " + tableName + " set assets=? " + "where id>=" + std::to_string(begin) +
217 " and id<=" + std::to_string(end) + ";";
218 sqlite3_stmt *stmt = nullptr;
219 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
220 assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
221 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
222 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
223 SQLiteUtils::ResetStatement(stmt, true, errCode);
224 }
225
DeleteLocalRecord(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName)226 void DeleteLocalRecord(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName)
227 {
228 ASSERT_NE(db, nullptr);
229 for (int64_t i = begin; i < begin + count; i++) {
230 string sql = "DELETE FROM " + tableName + " WHERE id ='" + std::to_string(i) + "';";
231 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db, sql), E_OK);
232 }
233 }
234
DeleteCloudDBData(int64_t begin,int64_t count,const std::string & tableName)235 void DeleteCloudDBData(int64_t begin, int64_t count, const std::string &tableName)
236 {
237 for (int64_t i = begin; i < begin + count; i++) {
238 VBucket idMap;
239 idMap.insert_or_assign("#_gid", std::to_string(i));
240 ASSERT_EQ(g_virtualCloudDb->DeleteByGid(tableName, idMap), DBStatus::OK);
241 }
242 }
243
UpdateCloudDBData(int64_t begin,int64_t count,int64_t gidStart,int64_t versionStart,const std::string & tableName)244 void UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart, int64_t versionStart,
245 const std::string &tableName)
246 {
247 std::this_thread::sleep_for(std::chrono::milliseconds(1));
248 std::vector<VBucket> record;
249 std::vector<VBucket> extend;
250 GenerateDataRecords(begin, count, gidStart, record, extend);
251 for (auto &entry: extend) {
252 entry[CloudDbConstant::VERSION_FIELD] = std::to_string(versionStart++);
253 }
254 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(tableName, std::move(record), extend), DBStatus::OK);
255 std::this_thread::sleep_for(std::chrono::milliseconds(1));
256 }
257
QueryStatusCallback(void * data,int count,char ** colValue,char ** colName)258 int QueryStatusCallback(void *data, int count, char **colValue, char **colName)
259 {
260 auto status = static_cast<std::vector<int64_t> *>(data);
261 const int decimal = 10;
262 for (int i = 0; i < count; i++) {
263 status->push_back(strtol(colValue[0], nullptr, decimal));
264 }
265 return 0;
266 }
267
CheckLockStatus(sqlite3 * db,int startId,int endId,LockStatus lockStatus)268 void CheckLockStatus(sqlite3 *db, int startId, int endId, LockStatus lockStatus)
269 {
270 std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
271 std::string sql = "select status from " + logName + " where data_key >=" + std::to_string(startId) +
272 " and data_key <=" + std::to_string(endId) + ";";
273 std::vector<int64_t> status;
274 char *str = NULL;
275 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryStatusCallback, static_cast<void *>(&status), &str),
276 SQLITE_OK);
277 ASSERT_EQ(static_cast<size_t>(endId - startId + 1), status.size());
278
279 for (auto stat : status) {
280 ASSERT_EQ(static_cast<int64_t>(lockStatus), stat);
281 }
282 }
283
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const std::string & tableName)284 void InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart, const std::string &tableName)
285 {
286 std::vector<VBucket> record;
287 std::vector<VBucket> extend;
288 GenerateDataRecords(begin, count, gidStart, record, extend);
289 if (tableName == ASSETS_TABLE_NAME_SHARED) {
290 for (auto &vBucket: record) {
291 vBucket.insert_or_assign(CloudDbConstant::CLOUD_OWNER, std::string("cloudA"));
292 }
293 }
294 ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(tableName, std::move(record), extend), DBStatus::OK);
295 }
296
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)297 void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
298 {
299 std::unique_lock<std::mutex> lock(g_processMutex);
300 bool result = g_processCondition.wait_for(
301 lock, std::chrono::seconds(waitTime), [&syncProcess]() { return syncProcess.process == FINISHED; });
302 ASSERT_EQ(result, true);
303 LOGD("-------------------sync end--------------");
304 }
305
CallSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus,DBStatus errCode=OK)306 void CallSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus, DBStatus errCode = OK)
307 {
308 g_syncProcess = {};
309 Query query = Query::Select().FromTable(tableNames);
310 std::vector<SyncProcess> expectProcess;
311 CloudSyncStatusCallback callback = [&errCode](const std::map<std::string, SyncProcess> &process) {
312 ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
313 std::unique_lock<std::mutex> lock(g_processMutex);
314 g_syncProcess = process.begin()->second;
315 if (g_syncProcess.process == FINISHED) {
316 g_processCondition.notify_one();
317 ASSERT_EQ(g_syncProcess.errCode, errCode);
318 }
319 };
320 CloudSyncOption option;
321 option.devices = {DEVICE_CLOUD};
322 option.mode = mode;
323 option.query = query;
324 option.waitTime = SYNC_WAIT_TIME;
325 option.lockAction = static_cast<LockAction>(0xff); // lock all
326 ASSERT_EQ(g_delegate->Sync(option, callback), dbStatus);
327
328 if (dbStatus == DBStatus::OK) {
329 WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
330 }
331 }
332
CheckDownloadForTest001(int index,map<std::string,Assets> & assets)333 void CheckDownloadForTest001(int index, map<std::string, Assets> &assets)
334 {
335 for (auto &item : assets) {
336 for (auto &asset : item.second) {
337 EXPECT_EQ(AssetOperationUtils::EraseBitMask(asset.status), static_cast<uint32_t>(AssetStatus::INSERT));
338 if (index < 4) { // 1-4 is inserted
339 EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::INSERT));
340 }
341 LOGD("asset [name]:%s, [status]:%u, [flag]:%u, [index]:%d", asset.name.c_str(), asset.status, asset.flag,
342 index);
343 }
344 }
345 }
346
CheckDownloadFailedForTest002(sqlite3 * & db)347 void CheckDownloadFailedForTest002(sqlite3 *&db)
348 {
349 std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME;
350 sqlite3_stmt *stmt = nullptr;
351 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
352 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
353 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
354 Type cloudValue;
355 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
356 std::vector<uint8_t> assetsBlob;
357 Assets assets;
358 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
359 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
360 ASSERT_EQ(assets.size(), 2u); // 2 is asset num
361 for (size_t i = 0; i < assets.size(); ++i) {
362 EXPECT_EQ(assets[i].status, AssetStatus::ABNORMAL);
363 }
364 }
365 int errCode;
366 SQLiteUtils::ResetStatement(stmt, true, errCode);
367 }
368
UpdateAssetsForLocal(sqlite3 * & db,int id,uint32_t status)369 void UpdateAssetsForLocal(sqlite3 *&db, int id, uint32_t status)
370 {
371 Assets assets;
372 Asset asset = ASSET_COPY;
373 asset.name = ASSET_COPY.name + std::to_string(id);
374 asset.status = status;
375 assets.emplace_back(asset);
376 asset.name = ASSET_COPY.name + std::to_string(id) + "_copy";
377 assets.emplace_back(asset);
378 int errCode;
379 std::vector<uint8_t> assetBlob;
380 const string sql = "update " + ASSETS_TABLE_NAME + " set assets=? where id = " + std::to_string(id);
381 sqlite3_stmt *stmt = nullptr;
382 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
383 assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
384 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
385 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
386 SQLiteUtils::ResetStatement(stmt, true, errCode);
387 }
388
CheckConsistentCount(sqlite3 * db,int64_t expectCount)389 void CheckConsistentCount(sqlite3 *db, int64_t expectCount)
390 {
391 EXPECT_EQ(sqlite3_exec(db, QUERY_CONSISTENT_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
392 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
393 }
394
CheckCompensatedCount(sqlite3 * db,int64_t expectCount)395 void CheckCompensatedCount(sqlite3 *db, int64_t expectCount)
396 {
397 EXPECT_EQ(sqlite3_exec(db, QUERY_COMPENSATED_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
398 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
399 }
400
CloseDb()401 void CloseDb()
402 {
403 if (g_delegate != nullptr) {
404 EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
405 g_delegate = nullptr;
406 }
407 delete g_observer;
408 g_virtualCloudDb = nullptr;
409 }
410
411 class DistributedDBCloudSyncerDownloadAssetsOnlyTest : public testing::Test {
412 public:
413 static void SetUpTestCase(void);
414 static void TearDownTestCase(void);
415 void SetUp();
416 void TearDown();
417
418 protected:
419 void CheckLocaLAssets(const std::string &tableName, const std::string &expectAssetId,
420 const std::set<int> &failIndex);
421 void CheckLocalAssetIsEmpty(const std::string &tableName);
422 void CheckCursorData(const std::string &tableName, int begin);
423 void WaitForSync(int &syncCount);
424 const RelationalSyncAbleStorage *GetRelationalStore();
425 void InitDataStatusTest(bool needDownload);
426 void DataStatusTest001(bool needDownload);
427 void DataStatusTest003();
428 void DataStatusTest004();
429 void DataStatusTest005();
430 void DataStatusTest006();
431 void DataStatusTest007();
432 sqlite3 *db = nullptr;
433 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
434 };
435
SetUpTestCase(void)436 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::SetUpTestCase(void)
437 {
438 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
439 g_storePath = g_testDir + "/" + STORE_ID + DB_SUFFIX;
440 LOGI("The test db is:%s", g_storePath.c_str());
441 g_virtualCloudDataTranslate = std::make_shared<VirtualCloudDataTranslate>();
442 RuntimeConfig::SetCloudTranslate(g_virtualCloudDataTranslate);
443 }
444
TearDownTestCase(void)445 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::TearDownTestCase(void) {}
446
SetUp(void)447 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::SetUp(void)
448 {
449 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
450 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
451 LOGE("rm test db files error.");
452 }
453 DistributedDBToolsUnitTest::PrintTestCaseInfo();
454 LOGD("Test dir is %s", g_testDir.c_str());
455 db = RelationalTestUtils::CreateDataBase(g_storePath);
456 ASSERT_NE(db, nullptr);
457 InitDatabase(db);
458 g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
459 ASSERT_NE(g_observer, nullptr);
460 ASSERT_EQ(
461 g_mgr.OpenStore(g_storePath, STORE_ID, RelationalStoreDelegate::Option{.observer = g_observer}, g_delegate),
462 DBStatus::OK);
463 ASSERT_NE(g_delegate, nullptr);
464 ASSERT_EQ(g_delegate->CreateDistributedTable(ASSETS_TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
465 ASSERT_EQ(g_delegate->CreateDistributedTable(NO_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
466 ASSERT_EQ(g_delegate->CreateDistributedTable(COMPOUND_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
467 g_virtualCloudDb = make_shared<VirtualCloudDb>();
468 g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
469 g_syncProcess = {};
470 ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
471 ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
472 DataBaseSchema dataBaseSchema;
473 GetCloudDbSchema(dataBaseSchema);
474 ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
475 g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
476 ASSERT_NE(g_cloudStoreHook, nullptr);
477 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
478 ASSERT_TRUE(communicatorAggregator_ != nullptr);
479 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
480 }
481
TearDown(void)482 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::TearDown(void)
483 {
484 RefObject::DecObjRef(g_store);
485 g_virtualCloudDb->ForkUpload(nullptr);
486 CloseDb();
487 EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
488 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
489 LOGE("rm test db files error.");
490 }
491 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
492 communicatorAggregator_ = nullptr;
493 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
494 }
495
CheckLocaLAssets(const std::string & tableName,const std::string & expectAssetId,const std::set<int> & failIndex)496 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::CheckLocaLAssets(const std::string &tableName,
497 const std::string &expectAssetId, const std::set<int> &failIndex)
498 {
499 std::string sql = "SELECT assets FROM " + tableName + ";";
500 sqlite3_stmt *stmt = nullptr;
501 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
502 int index = 0;
503 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
504 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
505 Type cloudValue;
506 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
507 Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
508 for (const auto &asset : assets) {
509 index++;
510 if (failIndex.find(index) != failIndex.end()) {
511 EXPECT_EQ(asset.assetId, "0");
512 } else {
513 EXPECT_EQ(asset.assetId, expectAssetId);
514 }
515 }
516 }
517 int errCode = E_OK;
518 SQLiteUtils::ResetStatement(stmt, true, errCode);
519 }
520
CheckLocalAssetIsEmpty(const std::string & tableName)521 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::CheckLocalAssetIsEmpty(const std::string &tableName)
522 {
523 std::string sql = "SELECT asset FROM " + tableName + ";";
524 sqlite3_stmt *stmt = nullptr;
525 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
526 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
527 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
528 }
529 int errCode = E_OK;
530 SQLiteUtils::ResetStatement(stmt, true, errCode);
531 }
532
CheckCursorData(const std::string & tableName,int begin)533 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::CheckCursorData(const std::string &tableName, int begin)
534 {
535 std::string logTableName = DBCommon::GetLogTableName(tableName);
536 std::string sql = "SELECT cursor FROM " + logTableName + ";";
537 sqlite3_stmt *stmt = nullptr;
538 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
539 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
540 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_INTEGER);
541 Type cloudValue;
542 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
543 EXPECT_EQ(std::get<int64_t>(cloudValue), begin);
544 begin++;
545 }
546 int errCode = E_OK;
547 SQLiteUtils::ResetStatement(stmt, true, errCode);
548 }
549
WaitForSync(int & syncCount)550 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::WaitForSync(int &syncCount)
551 {
552 std::unique_lock<std::mutex> lock(g_processMutex);
553 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(COMPENSATED_SYNC_WAIT_TIME),
554 [&syncCount]() { return syncCount == 2; }); // 2 is compensated sync
555 ASSERT_EQ(result, true);
556 }
557
GetRelationalStore()558 const RelationalSyncAbleStorage* DistributedDBCloudSyncerDownloadAssetsOnlyTest::GetRelationalStore()
559 {
560 RelationalDBProperties properties;
561 CloudDBSyncUtilsTest::InitStoreProp(g_storePath, APP_ID, USER_ID, STORE_ID, properties);
562 int errCode = E_OK;
563 g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
564 if (g_store == nullptr) {
565 return nullptr;
566 }
567 return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
568 }
569
InitDataStatusTest(bool needDownload)570 void DistributedDBCloudSyncerDownloadAssetsOnlyTest::InitDataStatusTest(bool needDownload)
571 {
572 int cloudCount = 20;
573 int localCount = 10;
574 InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
575 if (needDownload) {
576 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
577 }
578 std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
579 std::string sql = "update " + logName + " SET status = 1 where data_key in (1,11);";
580 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
581 sql = "update " + logName + " SET status = 2 where data_key in (2,12);";
582 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
583 sql = "update " + logName + " SET status = 3 where data_key in (3,13);";
584 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
585 std::this_thread::sleep_for(std::chrono::milliseconds(1));
586 InsertCloudDBData(0, localCount, 0, ASSETS_TABLE_NAME);
587 std::this_thread::sleep_for(std::chrono::milliseconds(1));
588 sql = "update " + ASSETS_TABLE_NAME + " set age='666' where id in (4);";
589 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
590 sql = "update " + logName + " SET status = 1 where data_key in (4);";
591 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
592 }
593
594 struct ProcessParam {
595 const std::map<std::string, SyncProcess> &OnProcess;
596 const SyncProcess &process;
597 std::mutex &processMutex;
598 std::condition_variable &cv;
599 bool &finish;
600 const CloudSyncStatusCallback &onFinish;
601 DBStatus expectResult;
602 };
603
HandleProcessFinish(ProcessParam & param)604 void HandleProcessFinish(ProcessParam ¶m)
605 {
606 if (param.process.process == FINISHED) {
607 if (param.onFinish) {
608 param.onFinish(param.OnProcess);
609 }
610 EXPECT_EQ(param.process.errCode, param.expectResult);
611 std::unique_lock<std::mutex> lock(param.processMutex);
612 param.finish = true;
613 param.cv.notify_one();
614 }
615 }
616
PriorityLevelSync(int32_t priorityLevel,const Query & query,const CloudSyncStatusCallback & onFinish,SyncMode mode,DBStatus expectResult=DBStatus::OK)617 void PriorityLevelSync(int32_t priorityLevel, const Query &query, const CloudSyncStatusCallback &onFinish,
618 SyncMode mode, DBStatus expectResult = DBStatus::OK)
619 {
620 std::mutex processMutex;
621 std::vector<SyncProcess> expectProcess;
622 std::condition_variable cv;
623 bool finish = false;
624 auto callback = [&cv, &onFinish, &finish, &processMutex, &expectResult]
625 (const std::map<std::string, SyncProcess> &process) {
626 for (auto &item : process) {
627 ProcessParam param = {process, item.second, processMutex, cv, finish, onFinish, expectResult};
628 HandleProcessFinish(param);
629 }
630 };
631 CloudSyncOption option;
632 option.devices = {DEVICE_CLOUD};
633 option.query = query;
634 option.mode = mode;
635 option.priorityTask = true;
636 option.priorityLevel = priorityLevel;
637 DBStatus syncResult = g_delegate->Sync(option, callback);
638 EXPECT_EQ(syncResult, DBStatus::OK);
639
640 std::unique_lock<std::mutex> lock(processMutex);
641 cv.wait(lock, [&finish]() {
642 return finish;
643 });
644 }
645
PriorityLevelSync(int32_t priorityLevel,const Query & query,SyncMode mode,DBStatus expectResult=DBStatus::OK)646 void PriorityLevelSync(int32_t priorityLevel, const Query &query, SyncMode mode, DBStatus expectResult = DBStatus::OK)
647 {
648 std::mutex processMutex;
649 std::vector<SyncProcess> expectProcess;
650 std::condition_variable cv;
651 bool finish = expectResult == DBStatus::OK ? false : true;
652 auto callback = [&cv, &finish, &processMutex]
653 (const std::map<std::string, SyncProcess> &process) {
654 for (auto &item : process) {
655 if (item.second.process == FINISHED) {
656 std::unique_lock<std::mutex> lock(processMutex);
657 finish = true;
658 cv.notify_one();
659 }
660 }
661 };
662 CloudSyncOption option;
663 option.devices = {DEVICE_CLOUD};
664 option.query = query;
665 option.mode = mode;
666 option.priorityTask = true;
667 option.priorityLevel = priorityLevel;
668 DBStatus syncResult = g_delegate->Sync(option, callback);
669 EXPECT_EQ(syncResult, expectResult);
670
671 std::unique_lock<std::mutex> lock(processMutex);
672 cv.wait(lock, [&finish]() {
673 return finish;
674 });
675 }
676
CheckAsset(sqlite3 * db,const std::string & tableName,int id,const Asset & expectAsset,bool expectFound)677 void CheckAsset(sqlite3 *db, const std::string &tableName, int id, const Asset &expectAsset, bool expectFound)
678 {
679 std::string sql = "select assets from " + tableName + " where id = " + std::to_string(id);
680 sqlite3_stmt *stmt = nullptr;
681 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
682 int errCode = SQLiteUtils::StepWithRetry(stmt);
683 ASSERT_EQ(errCode, SQLiteUtils::MapSQLiteErrno(SQLITE_ROW));
684 if (expectFound) {
685 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
686 }
687 Type cloudValue;
688 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
689 Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
690 bool found = false;
691 for (const auto &asset : assets) {
692 if (asset.name != expectAsset.name) {
693 continue;
694 }
695 found = true;
696 EXPECT_EQ(asset.status, expectAsset.status);
697 EXPECT_EQ(asset.hash, expectAsset.hash);
698 EXPECT_EQ(asset.assetId, expectAsset.assetId);
699 EXPECT_EQ(asset.uri, expectAsset.uri);
700 }
701 EXPECT_EQ(found, expectFound);
702 errCode = E_OK;
703 SQLiteUtils::ResetStatement(stmt, true, errCode);
704 EXPECT_EQ(errCode, E_OK);
705 }
706
CheckDBValue(sqlite3 * db,const std::string & tableName,int id,const std::string & field,const std::string & expectValue)707 void CheckDBValue(sqlite3 *db, const std::string &tableName, int id, const std::string &field,
708 const std::string &expectValue)
709 {
710 std::string sql = "select " + field + " from " + tableName + " where id = " + std::to_string(id);
711 sqlite3_stmt *stmt = nullptr;
712 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
713 int errCode = SQLiteUtils::StepWithRetry(stmt);
714 if (expectValue.empty()) {
715 EXPECT_EQ(errCode, SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
716 }
717 ASSERT_EQ(errCode, SQLiteUtils::MapSQLiteErrno(SQLITE_ROW));
718 std::string str;
719 (void)SQLiteUtils::GetColumnTextValue(stmt, 0, str);
720 EXPECT_EQ(str, expectValue);
721 SQLiteUtils::ResetStatement(stmt, true, errCode);
722 errCode = E_OK;
723 EXPECT_EQ(errCode, E_OK);
724 }
725
726 /**
727 * @tc.name: DownloadAssetsOnly001
728 * @tc.desc: Test sync with priorityLevel
729 * @tc.type: FUNC
730 * @tc.require:
731 * @tc.author: liaoyonghuang
732 */
733 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly001, TestSize.Level1)
734 {
735 /**
736 * @tc.steps:step1. init data
737 * @tc.expected: step1. return OK.
738 */
739 int cloudCount = 15; // 15 is num of cloud
740 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
741 /**
742 * @tc.steps:step2. Call sync with different priorityLevel
743 * @tc.expected: step2. OK
744 */
745 int syncFinishCount = 0;
746 g_virtualCloudDb->SetBlockTime(100);
__anon9ef010c90902() 747 std::thread syncThread1([&]() {
748 CloudSyncStatusCallback callback = [&syncFinishCount](const std::map<std::string, SyncProcess> &process) {
749 syncFinishCount++;
750 EXPECT_EQ(syncFinishCount, 3);
751 };
752 std::vector<int64_t> inValue = {0, 1, 2, 3, 4};
753 Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue);
754 PriorityLevelSync(0, query, callback, SyncMode::SYNC_MODE_CLOUD_MERGE);
755 });
756
__anon9ef010c90b02() 757 std::thread syncThread2([&]() {
758 CloudSyncStatusCallback callback = [&syncFinishCount](const std::map<std::string, SyncProcess> &process) {
759 syncFinishCount++;
760 EXPECT_EQ(syncFinishCount, 2);
761 };
762 std::vector<int64_t> inValue = {5, 6, 7, 8, 9};
763 Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue);
764 PriorityLevelSync(1, query, callback, SyncMode::SYNC_MODE_CLOUD_MERGE);
765 });
766
__anon9ef010c90d02() 767 std::thread syncThread3([&]() {
768 CloudSyncStatusCallback callback = [&syncFinishCount](const std::map<std::string, SyncProcess> &process) {
769 syncFinishCount++;
770 EXPECT_EQ(syncFinishCount, 1);
771 };
772 std::vector<int64_t> inValue = {10, 11, 12, 13, 14};
773 Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue);
774 PriorityLevelSync(2, query, callback, SyncMode::SYNC_MODE_CLOUD_MERGE);
775 });
776 syncThread1.join();
777 syncThread2.join();
778 syncThread3.join();
779 }
780
781 /**
782 * @tc.name: DownloadAssetsOnly002
783 * @tc.desc: Test download specified assets with unsupported mode
784 * @tc.type: FUNC
785 * @tc.require:
786 * @tc.author: liaoyonghuang
787 */
788 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly002, TestSize.Level0)
789 {
790 /**
791 * @tc.steps:step1. init data
792 * @tc.expected: step1. return OK.
793 */
794 int localDataCount = 10;
795 InsertLocalData(db, 0, localDataCount, ASSETS_TABLE_NAME, true);
796 UpdateLocalData(db, ASSETS_TABLE_NAME, {ASSET_COPY}, true);
797 int cloudCount = 10;
798 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
799 /**
800 * @tc.steps:step2. Download specified assets with mode SYNC_MODE_CLOUD_MERGE and SYNC_MODE_CLOUD_FORCE_PUSH
801 * @tc.expected: step2. sync fail
802 */
803 std::vector<int64_t> inValue = {0};
804 std::map<std::string, std::set<std::string>> assets;
805 assets["assets"] = {ASSET_COPY.name + "0"};
806 Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue).And().AssetsOnly(assets);
807
808 CloudSyncOption option;
809 option.devices = {DEVICE_CLOUD};
810 option.query = query;
811 option.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
812 option.priorityTask = true;
813 option.priorityLevel = 2u;
814 EXPECT_EQ(g_delegate->Sync(option, nullptr), DBStatus::NOT_SUPPORT);
815
816 option.mode = SyncMode::SYNC_MODE_CLOUD_FORCE_PUSH;
817 EXPECT_EQ(g_delegate->Sync(option, nullptr), DBStatus::NOT_SUPPORT);
818 }
819
820 /**
821 * @tc.name: DownloadAssetsOnly004
822 * @tc.desc: Test download specified assets
823 * @tc.type: FUNC
824 * @tc.require:
825 * @tc.author: liaoyonghuang
826 */
827 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly003, TestSize.Level0)
828 {
829 /**
830 * @tc.steps:step1. init data
831 * @tc.expected: step1. return OK.
832 */
833 int dataCount = 10;
834 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
835 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
836 for (int i = 0; i < dataCount; i++) {
837 Asset asset = ASSET_COPY;
838 asset.name += std::to_string(i);
839 asset.status = AssetStatus::UPDATE;
840 asset.hash = "local_new";
841 Assets assets = {asset};
842 asset.name += "_new";
843 assets.push_back(asset);
844 UpdateLocalData(db, ASSETS_TABLE_NAME, assets, i, i);
845 }
846 /**
847 * @tc.steps:step2. Download specified assets
848 * @tc.expected: step2. return OK.
849 */
850 std::vector<int64_t> inValue = {0};
851 std::map<std::string, std::set<std::string>> assets;
852 assets["assets"] = {ASSET_COPY.name + "0"};
853 Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue).And().AssetsOnly(assets);
854 PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
855
856 Asset assetCloud = ASSET_COPY;
857 assetCloud.name += std::to_string(0);
858 Asset assetLocal = ASSET_COPY;
859 assetLocal.name +=std::to_string(0) + "_new";
860 assetLocal.hash = "local_new";
861 assetLocal.status = AssetStatus::UPDATE;
862 CheckAsset(db, ASSETS_TABLE_NAME, 0, assetCloud, true);
863 CheckAsset(db, ASSETS_TABLE_NAME, 0, assetLocal, true);
864
865 for (int i = 1; i < dataCount; i++) {
866 Asset assetLocal1 = ASSET_COPY;
867 assetLocal1.name += std::to_string(i);
868 Asset assetLocal2 = ASSET_COPY;
869 assetLocal2.name +=std::to_string(i) + "_new";
870 assetLocal1.hash = "local_new";
871 assetLocal2.hash = "local_new";
872 assetLocal1.status = AssetStatus::UPDATE;
873 assetLocal2.status = AssetStatus::UPDATE;
874 CheckAsset(db, ASSETS_TABLE_NAME, i, assetLocal1, true);
875 CheckAsset(db, ASSETS_TABLE_NAME, i, assetLocal2, true);
876 }
877 }
878
879 /**
880 * @tc.name: DownloadAssetsOnly004
881 * @tc.desc: Test download specified assets
882 * @tc.type: FUNC
883 * @tc.require:
884 * @tc.author: liaoyonghuang
885 */
886 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly004, TestSize.Level0)
887 {
888 /**
889 * @tc.steps:step1. init data
890 * @tc.expected: step1. return OK.
891 */
892 int dataCount = 10;
893 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
894 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
895 std::vector<VBucket> record;
896 std::vector<VBucket> extend;
897 GenerateDataRecords(0, dataCount, 0, record, extend);
898 for (int i = 0; i < dataCount; i++) {
899 Asset asset1 = ASSET_COPY;
900 Asset asset2 = ASSET_COPY;
901 asset1.name += std::to_string(i);
902 asset2.name += std::to_string(i) + "_new";
903 asset1.hash = "cloud";
904 asset2.hash = "cloud";
905 Assets assets = {asset1, asset2};
906 record[i].insert_or_assign(COL_ASSETS, assets);
907 std::string newName = "name" + std::to_string(i) + "_new";
908 record[i].insert_or_assign(COL_NAME, newName);
909 }
910 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(ASSETS_TABLE_NAME, std::move(record), extend), DBStatus::OK);
911 /**
912 * @tc.steps:step2. Download specified assets
913 * @tc.expected: step2. return OK.
914 */
915 std::vector<int64_t> inValue = {0};
916 std::map<std::string, std::set<std::string>> assets;
917 assets["assets"] = {ASSET_COPY.name + "0"};
918 Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue).And().AssetsOnly(assets);
919 PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
920
921 Asset assetCloud1 = ASSET_COPY;
922 assetCloud1.name += std::to_string(0);
923 assetCloud1.hash = "cloud";
924 Asset assetCloud2 = ASSET_COPY;
925 assetCloud2.name +=std::to_string(0) + "_new";
926 assetCloud2.hash = "cloud";
927 CheckAsset(db, ASSETS_TABLE_NAME, 0, assetCloud1, true);
928 CheckAsset(db, ASSETS_TABLE_NAME, 0, assetCloud2, false);
929 CheckDBValue(db, ASSETS_TABLE_NAME, 0, COL_NAME, "name0");
930
931 for (int i = 1; i < dataCount; i++) {
932 Asset assetLocal1 = ASSET_COPY;
933 assetLocal1.name += std::to_string(i);
934 Asset assetLocal2 = ASSET_COPY;
935 assetLocal2.name +=std::to_string(i) + "_new";
936 CheckAsset(db, ASSETS_TABLE_NAME, i, assetLocal1, true);
937 CheckAsset(db, ASSETS_TABLE_NAME, i, assetLocal2, false);
938 CheckDBValue(db, ASSETS_TABLE_NAME, i, COL_NAME, "name" + std::to_string(i));
939 }
940 }
941
942 /**
943 * @tc.name: DownloadAssetsOnly005
944 * @tc.desc: Test download asseets which local no found
945 * @tc.type: FUNC
946 * @tc.require:
947 * @tc.author: luoguo
948 */
949 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly005, TestSize.Level0)
950 {
951 /**
952 * @tc.steps:step1. init data
953 * @tc.expected: step1. return OK.
954 */
955 int dataCount = 10;
956 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
957 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
958 InsertCloudDBData(dataCount, 1, 0, ASSETS_TABLE_NAME);
959 /**
960 * @tc.steps:step2. Download assets which local no found
961 * @tc.expected: step2. return ASSET_NOT_FOUND_FOR_DOWN_ONLY.
962 */
963 std::vector<int64_t> inValue = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
964 std::map<std::string, std::set<std::string>> assets;
965 assets["assets"] = {ASSET_COPY.name + "10"};
966 Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue).And().AssetsOnly(assets);
967 PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::ASSET_NOT_FOUND_FOR_DOWN_ONLY);
968 }
969
970 /**
971 * @tc.name: DownloadAssetsOnly006
972 * @tc.desc: Test download asseets which cloud no found
973 * @tc.type: FUNC
974 * @tc.require:
975 * @tc.author: luoguo
976 */
977 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly006, TestSize.Level0)
978 {
979 /**
980 * @tc.steps:step1. init data
981 * @tc.expected: step1. return OK.
982 */
983 int dataCount = 10;
984 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
985 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
986 InsertLocalData(db, dataCount, 1, ASSETS_TABLE_NAME, true);
987 /**
988 * @tc.steps:step2. Download assets which cloud no found
989 * @tc.expected: step2. return ASSET_NOT_FOUND_FOR_DOWN_ONLY.
990 */
991 std::vector<int64_t> inValue = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};;
992 std::map<std::string, std::set<std::string>> assets;
993 assets["assets"] = {ASSET_COPY.name + "10"};
994 Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue).And().AssetsOnly(assets);
995 PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::ASSET_NOT_FOUND_FOR_DOWN_ONLY);
996 }
997
998 /**
999 * @tc.name: DownloadAssetsOnly007
1000 * @tc.desc: Test download specified assets with group
1001 * @tc.type: FUNC
1002 * @tc.require:
1003 * @tc.author: luoguo
1004 */
1005 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly007, TestSize.Level0)
1006 {
1007 /**
1008 * @tc.steps:step1. init data
1009 * @tc.expected: step1. return OK.
1010 */
1011 int dataCount = 10;
1012 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1013 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1014 for (int i = 0; i < dataCount; i++) {
1015 Asset asset = ASSET_COPY;
1016 asset.name += std::to_string(i);
1017 asset.status = AssetStatus::UPDATE;
1018 asset.hash = "local_new";
1019 Assets assets = {asset};
1020 asset.name += "_new";
1021 assets.push_back(asset);
1022 UpdateLocalData(db, ASSETS_TABLE_NAME, assets, i, i);
1023 }
1024 /**
1025 * @tc.steps:step2. Download specified assets
1026 * @tc.expected: step2. return OK.
1027 */
1028 std::map<std::string, std::set<std::string>> assets;
1029 assets["assets"] = {ASSET_COPY.name + "0"};
1030 std::map<std::string, std::set<std::string>> assets1;
1031 assets1["assets"] = {ASSET_COPY.name + "1"};
1032 Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1033 EndGroup().Or().BeginGroup().EqualTo("id", 1).And().AssetsOnly(assets1).EndGroup();
1034 PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1035
1036 Asset assetCloud = ASSET_COPY;
1037 assetCloud.name += std::to_string(0);
1038 Asset assetLocal = ASSET_COPY;
1039 assetLocal.name +=std::to_string(0) + "_new";
1040 assetLocal.hash = "local_new";
1041 assetLocal.status = AssetStatus::UPDATE;
1042 CheckAsset(db, ASSETS_TABLE_NAME, 0, assetCloud, true);
1043 CheckAsset(db, ASSETS_TABLE_NAME, 0, assetLocal, true);
1044
1045 for (int i = 2; i < dataCount; i++) {
1046 Asset assetLocal1 = ASSET_COPY;
1047 assetLocal1.name += std::to_string(i);
1048 Asset assetLocal2 = ASSET_COPY;
1049 assetLocal2.name +=std::to_string(i) + "_new";
1050 assetLocal1.hash = "local_new";
1051 assetLocal2.hash = "local_new";
1052 assetLocal1.status = AssetStatus::UPDATE;
1053 assetLocal2.status = AssetStatus::UPDATE;
1054 CheckAsset(db, ASSETS_TABLE_NAME, i, assetLocal1, true);
1055 CheckAsset(db, ASSETS_TABLE_NAME, i, assetLocal2, true);
1056 }
1057 }
1058
1059 /**
1060 * @tc.name: DownloadAssetsOnly008
1061 * @tc.desc: Test download asseets which local no found
1062 * @tc.type: FUNC
1063 * @tc.require:
1064 * @tc.author: luoguo
1065 */
1066 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly008, TestSize.Level0)
1067 {
1068 /**
1069 * @tc.steps:step1. init data
1070 * @tc.expected: step1. return OK.
1071 */
1072 int dataCount = 10;
1073 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1074 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1075 InsertCloudDBData(dataCount, 1, 0, ASSETS_TABLE_NAME);
1076 /**
1077 * @tc.steps:step2. Download assets which local no found
1078 * @tc.expected: step2. return ASSET_NOT_FOUND_FOR_DOWN_ONLY.
1079 */
1080 std::vector<int64_t> inValue = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};;
1081 std::map<std::string, std::set<std::string>> assets;
1082 assets["assets"] = {ASSET_COPY.name + "0"};
1083 std::map<std::string, std::set<std::string>> assets1;
1084 assets1["assets"] = {ASSET_COPY.name + "10"};
1085 Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1086 EndGroup().Or().BeginGroup().In("id", inValue).And().AssetsOnly(assets1).EndGroup();
1087 PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1088 }
1089
1090 /**
1091 * @tc.name: DownloadAssetsOnly009
1092 * @tc.desc: Test download asseets which cloud no found
1093 * @tc.type: FUNC
1094 * @tc.require:
1095 * @tc.author: luoguo
1096 */
1097 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly009, TestSize.Level0)
1098 {
1099 /**
1100 * @tc.steps:step1. init data
1101 * @tc.expected: step1. return OK.
1102 */
1103 int dataCount = 10;
1104 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1105 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1106 InsertLocalData(db, dataCount, 1, ASSETS_TABLE_NAME, true);
1107 /**
1108 * @tc.steps:step2. Download assets which cloud no found
1109 * @tc.expected: step2. return ASSET_NOT_FOUND_FOR_DOWN_ONLY.
1110 */
1111 std::map<std::string, std::set<std::string>> assets;
1112 assets["assets"] = {ASSET_COPY.name + "0"};
1113 std::map<std::string, std::set<std::string>> assets1;
1114 assets1["assets"] = {ASSET_COPY.name + "10"};
1115 Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1116 EndGroup().Or().BeginGroup().EqualTo("id", 10).And().AssetsOnly(assets1).EndGroup();
1117 PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1118 }
1119
1120 /**
1121 * @tc.name: DownloadAssetsOnly010
1122 * @tc.desc: Test assets only multi time.
1123 * @tc.type: FUNC
1124 * @tc.require:
1125 * @tc.author: luoguo
1126 */
1127 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly010, TestSize.Level0)
1128 {
1129 /**
1130 * @tc.steps:step1. init data
1131 * @tc.expected: step1. return OK.
1132 */
1133 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1134 int dataCount = 10;
1135 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1136 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1137
1138 /**
1139 * @tc.steps:step2. AssetsOnly twice
1140 * @tc.expected: step2. check notify count.
1141 */
1142 std::map<std::string, std::set<std::string>> assets;
1143 assets["assets"] = {ASSET_COPY.name + "0"};
1144 Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).And().
1145 AssetsOnly(assets).EndGroup();
1146 g_observer->ResetCloudSyncToZero();
1147 PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1148 auto changedData = g_observer->GetSavedChangedData();
1149 EXPECT_EQ(changedData.size(), 0u);
1150
1151 /**
1152 * @tc.steps:step3. AssetsOnly behine EndGroup
1153 * @tc.expected: step3. check notify count.
1154 */
1155 Query query1 = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).EndGroup().And().
1156 AssetsOnly(assets);
1157 g_observer->ResetCloudSyncToZero();
1158 PriorityLevelSync(2, query1, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1159 changedData = g_observer->GetSavedChangedData();
1160 EXPECT_EQ(changedData.size(), 0u);
1161
1162 /**
1163 * @tc.steps:step4. AssetsOnly EndGroup use And
1164 * @tc.expected: step4. check notify count.
1165 */
1166 Query query2 = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1167 EndGroup().And().BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).EndGroup();
1168 g_observer->ResetCloudSyncToZero();
1169 PriorityLevelSync(2, query2, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1170 changedData = g_observer->GetSavedChangedData();
1171 EXPECT_EQ(changedData.size(), 0u);
1172 }
1173
1174 /**
1175 * @tc.name: DownloadAssetsOnly011
1176 * @tc.desc: Check assets only sync will up.
1177 * @tc.type: FUNC
1178 * @tc.require:
1179 * @tc.author: luoguo
1180 */
1181 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly011, TestSize.Level0)
1182 {
1183 /**
1184 * @tc.steps:step1. init data
1185 * @tc.expected: step1. return OK.
1186 */
1187 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1188 int dataCount = 10;
1189 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1190 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1191
1192 /**
1193 * @tc.steps:step2. assets only sync
1194 * @tc.expected: step2. check assets sync result.
1195 */
1196 std::map<std::string, std::set<std::string>> assets;
1197 assets["assets"] = {ASSET_COPY.name + "0"};
1198 Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1199 EndGroup();
1200 PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1201
1202 /**
1203 * @tc.steps:step3. check cursor and flag
1204 * @tc.expected: step3. ok.
1205 */
1206 std::string sql = "select cursor from naturalbase_rdb_aux_student_log where data_key=0;";
1207 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1208 reinterpret_cast<void *>(21u), nullptr), SQLITE_OK);
1209
1210 sql = "select flag from naturalbase_rdb_aux_student_log where data_key=0;";
1211 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1212 reinterpret_cast<void *>(0u), nullptr), SQLITE_OK);
1213 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1214 }
1215
1216 /**
1217 * @tc.name: DownloadAssetsOnly012
1218 * @tc.desc: Test sync with same priorityLevel should be sync in order.
1219 * @tc.type: FUNC
1220 * @tc.require:
1221 * @tc.author: luoguo
1222 */
1223 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly012, TestSize.Level1)
1224 {
1225 /**
1226 * @tc.steps:step1. init data
1227 * @tc.expected: step1. return OK.
1228 */
1229 int cloudCount = 15; // 15 is num of cloud
1230 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1231 /**
1232 * @tc.steps:step2. Call sync with same priorityLevel
1233 * @tc.expected: step2. OK
1234 */
1235 int syncFinishCount = 0;
1236 g_virtualCloudDb->SetBlockTime(1000);
__anon9ef010c90f02() 1237 std::thread syncThread1([&]() {
1238 CloudSyncStatusCallback callback = [&syncFinishCount](const std::map<std::string, SyncProcess> &process) {
1239 syncFinishCount++;
1240 EXPECT_EQ(syncFinishCount, 1);
1241 };
1242 std::vector<int64_t> inValue = {0, 1, 2, 3, 4};
1243 Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue);
1244 PriorityLevelSync(0, query, callback, SyncMode::SYNC_MODE_CLOUD_MERGE);
1245 });
1246 std::this_thread::sleep_for(std::chrono::milliseconds(500));
__anon9ef010c91102() 1247 std::thread syncThread2([&]() {
1248 CloudSyncStatusCallback callback = [&syncFinishCount](const std::map<std::string, SyncProcess> &process) {
1249 syncFinishCount++;
1250 EXPECT_EQ(syncFinishCount, 2);
1251 };
1252 std::vector<int64_t> inValue = {5, 6, 7, 8, 9};
1253 Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue);
1254 PriorityLevelSync(0, query, callback, SyncMode::SYNC_MODE_CLOUD_MERGE);
1255 });
1256 syncThread1.join();
1257 syncThread2.join();
1258 }
1259
1260 /**
1261 * @tc.name: DownloadAssetsOnly013
1262 * @tc.desc: Check assets only sync no data notify.
1263 * @tc.type: FUNC
1264 * @tc.require:
1265 * @tc.author: luoguo
1266 */
1267 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly013, TestSize.Level0)
1268 {
1269 /**
1270 * @tc.steps:step1. init data
1271 * @tc.expected: step1. return OK.
1272 */
1273 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1274 int dataCount = 10;
1275 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1276 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1277
1278 /**
1279 * @tc.steps:step2. assets only sync
1280 * @tc.expected: step2. check notify count.
1281 */
1282 std::map<std::string, std::set<std::string>> assets;
1283 assets["assets"] = {ASSET_COPY.name + "0"};
1284 Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1285 EndGroup();
1286 PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1287 auto changedData = g_observer->GetSavedChangedData();
1288 EXPECT_EQ(changedData.size(), 1u);
1289 }
1290
1291 /**
1292 * @tc.name: DownloadAssetsOnly014
1293 * @tc.desc: test assets only sync with cloud delete data.
1294 * @tc.type: FUNC
1295 * @tc.require:
1296 * @tc.author: luoguo
1297 */
1298 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly014, TestSize.Level0)
1299 {
1300 /**
1301 * @tc.steps:step1. init data
1302 * @tc.expected: step1. return OK.
1303 */
1304 int dataCount = 10;
1305 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1306 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1307 DeleteCloudDBData(0, 1, ASSETS_TABLE_NAME);
1308 /**
1309 * @tc.steps:step2. Download assets which cloud delete.
1310 * @tc.expected: step2. return ASSET_NOT_FOUND_FOR_DOWN_ONLY.
1311 */
1312 std::map<std::string, std::set<std::string>> assets;
1313 assets["assets"] = {ASSET_COPY.name + "0"};
1314 Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1315 EndGroup();
1316 PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1317 }
1318
1319 /**
1320 * @tc.name: DownloadAssetsOnly015
1321 * @tc.desc: test compensated sync.
1322 * @tc.type: FUNC
1323 * @tc.require:
1324 * @tc.author: luoguo
1325 */
1326 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly015, TestSize.Level0)
1327 {
1328 /**
1329 * @tc.steps:step1. init data
1330 * @tc.expected: step1. return OK.
1331 */
1332 int dataCount = 10;
1333 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1334 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1335
1336 /**
1337 * @tc.steps:step2. set all data wait compensated.
1338 * @tc.expected: step2. return ok.
1339 */
1340 std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " set flag=flag|0x10;";
1341 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), nullptr, nullptr, nullptr), SQLITE_OK);
1342 sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " where flag&0x10=0x10;";
1343 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1344 reinterpret_cast<void *>(10u), nullptr), SQLITE_OK);
1345
1346 /**
1347 * @tc.steps:step3. sync with compensated.
1348 * @tc.expected: step3. return ok.
1349 */
1350 std::mutex processMutex;
1351 std::vector<SyncProcess> expectProcess;
1352 std::condition_variable cv;
1353 bool finish = false;
1354 auto callback = [&cv, &finish, &processMutex]
__anon9ef010c91302(const std::map<std::string, SyncProcess> &process) 1355 (const std::map<std::string, SyncProcess> &process) {
1356 for (auto &item : process) {
1357 if (item.second.process == FINISHED) {
1358 EXPECT_EQ(item.second.errCode, DBStatus::OK);
1359 std::unique_lock<std::mutex> lock(processMutex);
1360 finish = true;
1361 cv.notify_one();
1362 }
1363 }
1364 };
1365 CloudSyncOption option;
1366 option.devices = {DEVICE_CLOUD};
1367 option.priorityTask = true;
1368 option.compensatedSyncOnly = true;
1369 DBStatus syncResult = g_delegate->Sync(option, callback);
1370 EXPECT_EQ(syncResult, DBStatus::OK);
1371
1372 /**
1373 * @tc.steps:step4. wait sync finish and check data.
1374 * @tc.expected: step4. return ok.
1375 */
1376 std::unique_lock<std::mutex> lock(processMutex);
__anon9ef010c91402() 1377 cv.wait(lock, [&finish]() {
1378 return finish;
1379 });
1380 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1381 reinterpret_cast<void *>(0u), nullptr), SQLITE_OK);
1382 }
1383
1384 /**
1385 * @tc.name: DownloadAssetsOnly016
1386 * @tc.desc: test assets only sync with lock data.
1387 * @tc.type: FUNC
1388 * @tc.require:
1389 * @tc.author: luoguo
1390 */
1391 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly016, TestSize.Level0)
1392 {
1393 /**
1394 * @tc.steps:step1. init data
1395 * @tc.expected: step1. return OK.
1396 */
1397 int dataCount = 10;
1398 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1399 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1400
1401 /**
1402 * @tc.steps:step2. lock data.
1403 * @tc.expected: step2. return OK.
1404 */
1405 std::vector<std::vector<uint8_t>> hashKey;
1406 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " cloud_gid=0 ", db, hashKey);
1407 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
1408
1409 /**
1410 * @tc.steps:step3. assets only sync.
1411 * @tc.expected: step3. return OK.
1412 */
1413 std::map<std::string, std::set<std::string>> assets;
1414 assets["assets"] = {ASSET_COPY.name + "0"};
1415 std::map<std::string, std::set<std::string>> assets1;
1416 assets1["assets"] = {ASSET_COPY.name + "1"};
1417 Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1418 EndGroup().Or().BeginGroup().EqualTo("id", 1).And().AssetsOnly(assets1).EndGroup();
1419 g_observer->ResetCloudSyncToZero();
1420 PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1421
1422 /**
1423 * @tc.steps:step4. check asset changed data.
1424 * @tc.expected: step4. return OK.
1425 */
1426 auto changedData = g_observer->GetSavedChangedData();
1427 EXPECT_EQ(changedData.size(), 1u);
1428 auto item = changedData[ASSETS_TABLE_NAME];
1429 auto assetMsg = item.primaryData[1];
1430 EXPECT_EQ(assetMsg.size(), 1u);
1431 }
1432
1433 /**
1434 * @tc.name: DownloadAssetsOnly017
1435 * @tc.desc: test assets only sync with error priority level.
1436 * @tc.type: FUNC
1437 * @tc.require:
1438 * @tc.author: luoguo
1439 */
1440 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly017, TestSize.Level0)
1441 {
1442 /**
1443 * @tc.steps:step1. init data
1444 * @tc.expected: step1. return OK.
1445 */
1446 int dataCount = 10;
1447 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1448 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1449
1450 /**
1451 * @tc.steps:step2. assets only sync with error priority level.
1452 * @tc.expected: step2. return INVALID_ARGS.
1453 */
1454 std::map<std::string, std::set<std::string>> assets;
1455 assets["assets"] = {ASSET_COPY.name + "0"};
1456 Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1457 EndGroup();
1458 PriorityLevelSync(0, query, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::INVALID_ARGS);
1459
1460 /**
1461 * @tc.steps:step3. priority sync with error priority level.
1462 * @tc.expected: step3. return INVALID_ARGS.
1463 */
1464 query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).EndGroup();
1465 PriorityLevelSync(3, query, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::INVALID_ARGS);
1466 }
1467
1468 /**
1469 * @tc.name: DownloadAssetsOnly018
1470 * @tc.desc: test assets only sync same record can merge assets map.
1471 * @tc.type: FUNC
1472 * @tc.require:
1473 * @tc.author: luoguo
1474 */
1475 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly018, TestSize.Level0)
1476 {
1477 /**
1478 * @tc.steps:step1. init data
1479 * @tc.expected: step1. return OK.
1480 */
1481 int dataCount = 10;
1482 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1483 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1484
1485 /**
1486 * @tc.steps:step2. assets only sync.
1487 * @tc.expected: step2. return OK.
1488 */
1489 std::map<std::string, std::set<std::string>> assets;
1490 assets["assets"] = {ASSET_COPY.name + "0"};
1491 std::map<std::string, std::set<std::string>> assets1;
1492 assets1["assets"] = {ASSET_COPY.name + "0_copy"};
1493 Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1494 EndGroup().Or().BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets1).EndGroup();
1495 PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1496
1497 /**
1498 * @tc.steps:step3. check asset changed data.
1499 * @tc.expected: step3. return OK.
1500 */
1501 auto changedData = g_observer->GetSavedChangedData();
1502 EXPECT_EQ(changedData.size(), 1u);
1503 auto item = changedData[ASSETS_TABLE_NAME];
1504 auto assetMsg = item.primaryData[1];
1505 EXPECT_EQ(assetMsg.size(), 1u);
1506 }
1507
1508 /**
1509 * @tc.name: DownloadAssetsOnly019
1510 * @tc.desc: test assets only sync with cloud delete data.
1511 * @tc.type: FUNC
1512 * @tc.require:
1513 * @tc.author: luoguo
1514 */
1515 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly019, TestSize.Level0)
1516 {
1517 /**
1518 * @tc.steps:step1. init data
1519 * @tc.expected: step1. return OK.
1520 */
1521 int dataCount = 10;
1522 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1523 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1524 DeleteCloudDBData(0, dataCount, ASSETS_TABLE_NAME);
1525 /**
1526 * @tc.steps:step2. Download assets which cloud delete.
1527 * @tc.expected: step2. return ASSET_NOT_FOUND_FOR_DOWN_ONLY.
1528 */
1529 std::map<std::string, std::set<std::string>> assets;
1530 assets["assets"] = {ASSET_COPY.name + "0"};
1531 Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1532 EndGroup();
1533 PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1534 }
1535
1536 /**
1537 * @tc.name: DownloadAssetsOnly020
1538 * @tc.desc: Test the consistent flag after syncing without asset
1539 * @tc.type: FUNC
1540 * @tc.require:
1541 * @tc.author: bty
1542 */
1543 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly020, TestSize.Level0)
1544 {
1545 /**
1546 * @tc.steps:step1. init data
1547 * @tc.expected: step1. return OK.
1548 */
1549 int dataCount = 30;
1550 InsertLocalData(db, 0, dataCount, ASSETS_TABLE_NAME, true);
1551 /**
1552 * @tc.steps:step2. sync
1553 * @tc.expected: step2. return OK.
1554 */
1555 int upIdx = 0;
__anon9ef010c91502(const std::string &tableName, VBucket &extend) 1556 g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
1557 upIdx++;
1558 if (upIdx > 20 && upIdx <= 30) {
1559 int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
1560 extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
1561 }
1562 });
__anon9ef010c91602(const std::string &tableName, std::map<std::string, Assets> &) 1563 g_virtualAssetLoader->ForkDownload([](const std::string &tableName, std::map<std::string, Assets> &) {
1564 EXPECT_TRUE(false);
1565 });
1566 int queryIdx = 0;
__anon9ef010c91702(const std::string &, VBucket &) 1567 g_virtualCloudDb->ForkQuery([&queryIdx](const std::string &, VBucket &) {
1568 queryIdx++;
1569 if (queryIdx == 3) {
1570 std::vector<int64_t> inValue = {5, 6, 7, 8, 9};
1571 Query query = Query::Select().From(ASSETS_TABLE_NAME).In("id", inValue);
1572 CloudSyncOption option;
1573 option.devices = {DEVICE_CLOUD};
1574 option.query = query;
1575 option.priorityTask = true;
1576 g_delegate->Sync(option, nullptr); // In order to pause compensate sync
1577 }
1578 });
1579 int callCount = 0;
__anon9ef010c91802() 1580 g_cloudStoreHook->SetSyncFinishHook([&callCount]() {
1581 callCount++;
1582 g_processCondition.notify_one();
1583 });
1584 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1585 WaitForSync(callCount);
1586 /**
1587 * @tc.steps:step3. check count
1588 * @tc.expected: step3. return OK.
1589 */
1590 CheckConsistentCount(db, dataCount);
1591 g_virtualCloudDb->ForkUpload(nullptr);
1592 g_cloudStoreHook->SetSyncFinishHook(nullptr);
1593 g_virtualAssetLoader->ForkDownload(nullptr);
1594 g_virtualCloudDb->ForkQuery(nullptr);
1595 }
1596
1597 /**
1598 * @tc.name: DownloadAssetsOnly021
1599 * @tc.desc: test force pull mode pull mode can forcibly pull assets.
1600 * @tc.type: FUNC
1601 * @tc.require:
1602 * @tc.author: luoguo
1603 */
1604 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly021, TestSize.Level0)
1605 {
1606 /**
1607 * @tc.steps:step1. init data
1608 * @tc.expected: step1. return OK.
1609 */
1610 int dataCount = 10;
1611 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1612 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1613 /**
1614 * @tc.steps:step2. Download id 0 with force pull mode.
1615 * @tc.expected: step2. return ok.
1616 */
1617 Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).EndGroup();
1618 g_observer->ResetCloudSyncToZero();
1619 PriorityLevelSync(2, query, nullptr, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1620 /**
1621 * @tc.steps:step3. check data type.
1622 * @tc.expected: step3. return ok.
1623 */
1624 auto data = g_observer->GetSavedChangedData();
1625 EXPECT_EQ(data.size(), 1u);
1626 EXPECT_EQ(data[ASSETS_TABLE_NAME].type, ChangedDataType::ASSET);
1627 }
1628
1629 /**
1630 * @tc.name: DownloadAssetsOnly022
1631 * @tc.desc: test assets only without and.
1632 * @tc.type: FUNC
1633 * @tc.require:
1634 * @tc.author: luoguo
1635 */
1636 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly022, TestSize.Level0)
1637 {
1638 /**
1639 * @tc.steps:step1. init data
1640 * @tc.expected: step1. return OK.
1641 */
1642 int dataCount = 10;
1643 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1644 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1645
1646 /**
1647 * @tc.steps:step2. assets only sync.
1648 * @tc.expected: step2. return INVALID_ARGS.
1649 */
1650 std::map<std::string, std::set<std::string>> assets;
1651 assets["assets"] = {ASSET_COPY.name + "0"};
1652 std::map<std::string, std::set<std::string>> assets1;
1653 assets1["assets"] = {ASSET_COPY.name + "0_copy"};
1654 Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).AssetsOnly(assets).
1655 EndGroup().Or().BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets1).EndGroup();
1656 PriorityLevelSync(2, query, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::INVALID_ARGS);
1657 }
1658
1659 /**
1660 * @tc.name: DownloadAssetsOnly023
1661 * @tc.desc: test assets only with group and.
1662 * @tc.type: FUNC
1663 * @tc.require:
1664 * @tc.author: luoguo
1665 */
1666 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsOnlyTest, DownloadAssetsOnly023, TestSize.Level0)
1667 {
1668 /**
1669 * @tc.steps:step1. init data
1670 * @tc.expected: step1. return OK.
1671 */
1672 int dataCount = 10;
1673 InsertCloudDBData(0, dataCount, 0, ASSETS_TABLE_NAME);
1674 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1675
1676 /**
1677 * @tc.steps:step2. assets only sync.
1678 * @tc.expected: step2. return Ok.
1679 */
1680 std::map<std::string, std::set<std::string>> assets;
1681 assets["assets"] = {ASSET_COPY.name + "0"};
1682 std::map<std::string, std::set<std::string>> assets1;
1683 assets1["assets"] = {ASSET_COPY.name + "0_copy"};
1684 Query query = Query::Select().From(ASSETS_TABLE_NAME).BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets).
1685 EndGroup().And().BeginGroup().EqualTo("id", 0).And().AssetsOnly(assets1).EndGroup();
1686 PriorityLevelSync(2, query, SyncMode::SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
1687 }
1688 } // namespace
1689 #endif // RELATIONAL_STORE
1690