• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "cloud/asset_operation_utils.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "relational_store_client.h"
24 #include "relational_store_instance.h"
25 #include "relational_store_manager.h"
26 #include "runtime_config.h"
27 #include "sqlite_relational_store.h"
28 #include "sqlite_relational_utils.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include <gtest/gtest.h>
34 #include <iostream>
35 
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40 
41 namespace {
42 const string STORE_ID = "Relational_Store_Lock_Sync";
43 const string DB_SUFFIX = ".db";
44 const string ASSETS_TABLE_NAME = "student";
45 const string ASSETS_TABLE_NAME_SHARED = "student_shared";
46 const string DEVICE_CLOUD = "cloud_dev";
47 const string COL_ID = "id";
48 const string COL_NAME = "name";
49 const string COL_HEIGHT = "height";
50 const string COL_ASSET = "asset";
51 const string COL_ASSETS = "assets";
52 const string COL_AGE = "age";
53 const int64_t WAIT_TIME = 5;
54 const std::vector<Field> CLOUD_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true}, {COL_NAME, TYPE_INDEX<std::string>},
55     {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>}};
56 const string CREATE_SINGLE_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + ASSETS_TABLE_NAME + "(" + COL_ID +
57     " INTEGER PRIMARY KEY," + COL_NAME + " TEXT ," + COL_ASSET + " ASSET," + COL_ASSETS + " ASSETS" + ");";
58 const Asset ASSET_COPY = {.version = 1,
59     .name = "Phone",
60     .assetId = "0",
61     .subpath = "/local/sync",
62     .uri = "/local/sync",
63     .modifyTime = "123456",
64     .createTime = "",
65     .size = "256",
66     .hash = "ASE"};
67 const Assets ASSETS_COPY1 = { ASSET_COPY };
68 const string ASSET_SUFFIX = "_copy";
69 
70 string g_storePath;
71 string g_testDir;
72 RelationalStoreObserverUnitTest *g_observer = nullptr;
73 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
74 RelationalStoreDelegate *g_delegate = nullptr;
75 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
76 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
77 std::shared_ptr<VirtualCloudDataTranslate> g_virtualCloudDataTranslate;
78 SyncProcess g_syncProcess;
79 std::condition_variable g_processCondition;
80 std::mutex g_processMutex;
81 IRelationalStore *g_store = nullptr;
82 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
83 int64_t g_nameId;
84 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
85 
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)86 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
87 {
88     TableSchema assetsTableSchema = {.name = ASSETS_TABLE_NAME, .sharedTableName = ASSETS_TABLE_NAME_SHARED,
89         .fields = CLOUD_FIELDS};
90     dataBaseSchema.tables.push_back(assetsTableSchema);
91 }
92 
CloseDb()93 void CloseDb()
94 {
95     delete g_observer;
96     g_virtualCloudDb = nullptr;
97     if (g_delegate != nullptr) {
98         EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
99         g_delegate = nullptr;
100     }
101 }
102 
103 class DistributedDBCloudSyncerLockTest : public testing::Test {
104 public:
105     static void SetUpTestCase(void);
106     static void TearDownTestCase(void);
107     void SetUp();
108     void TearDown();
109 
110 protected:
111     void Init();
112     const RelationalSyncAbleStorage *GetRelationalStore();
113     void InsertLocalData(int64_t begin, int64_t count, const std::string &tableName, bool isAssetNull = true);
114     void GenerateDataRecords(
115         int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend);
116     void InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart, const std::string &tableName);
117     void UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart, int64_t versionStart,
118         const std::string &tableName);
119     void DeleteCloudDBData(int64_t beginGid, int64_t count, const std::string &tableName);
120     void CallSync(const CloudSyncOption &option, DBStatus expectResult = OK);
121 
122     void TestConflictSync001(bool isUpdate);
123     void CheckAssetStatusNormal();
124     void UpdateCloudAssets(Asset &asset, Assets &assets, const std::string &version);
125     void CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated = false);
126     sqlite3 *db = nullptr;
127 };
128 
SetUpTestCase(void)129 void DistributedDBCloudSyncerLockTest::SetUpTestCase(void)
130 {
131     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
132     g_storePath = g_testDir + "/" + STORE_ID + DB_SUFFIX;
133     LOGI("The test db is:%s", g_storePath.c_str());
134     g_virtualCloudDataTranslate = std::make_shared<VirtualCloudDataTranslate>();
135     RuntimeConfig::SetCloudTranslate(g_virtualCloudDataTranslate);
136 }
137 
TearDownTestCase(void)138 void DistributedDBCloudSyncerLockTest::TearDownTestCase(void) {}
139 
SetUp(void)140 void DistributedDBCloudSyncerLockTest::SetUp(void)
141 {
142     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
143         LOGE("rm test db files error.");
144     }
145     DistributedDBToolsUnitTest::PrintTestCaseInfo();
146     LOGD("Test dir is %s", g_testDir.c_str());
147     Init();
148     g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
149     ASSERT_NE(g_cloudStoreHook, nullptr);
150 }
151 
TearDown(void)152 void DistributedDBCloudSyncerLockTest::TearDown(void)
153 {
154     RefObject::DecObjRef(g_store);
155     g_virtualCloudDb->ForkUpload(nullptr);
156     CloseDb();
157     EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
158     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
159         LOGE("rm test db files error.");
160     }
161 }
162 
Init()163 void DistributedDBCloudSyncerLockTest::Init()
164 {
165     db = RelationalTestUtils::CreateDataBase(g_storePath);
166     ASSERT_NE(db, nullptr);
167     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
168     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_SINGLE_PRIMARY_KEY_TABLE), SQLITE_OK);
169     g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
170     ASSERT_NE(g_observer, nullptr);
171     ASSERT_EQ(g_mgr.OpenStore(g_storePath, STORE_ID, RelationalStoreDelegate::Option{.observer = g_observer},
172         g_delegate), DBStatus::OK);
173     ASSERT_NE(g_delegate, nullptr);
174     ASSERT_EQ(g_delegate->CreateDistributedTable(ASSETS_TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
175     g_virtualCloudDb = make_shared<VirtualCloudDb>();
176     g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
177     g_syncProcess = {};
178     ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
179     ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
180     DataBaseSchema dataBaseSchema;
181     GetCloudDbSchema(dataBaseSchema);
182     ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
183     g_nameId = 0;
184 }
185 
GetRelationalStore()186 const RelationalSyncAbleStorage* DistributedDBCloudSyncerLockTest::GetRelationalStore()
187 {
188     RelationalDBProperties properties;
189     CloudDBSyncUtilsTest::InitStoreProp(g_storePath, APP_ID, USER_ID, STORE_ID, properties);
190     int errCode = E_OK;
191     g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
192     if (g_store == nullptr) {
193         return nullptr;
194     }
195     return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
196 }
197 
198 
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,std::vector<VBucket> & record,std::vector<VBucket> & extend)199 void DistributedDBCloudSyncerLockTest::GenerateDataRecords(
200     int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend)
201 {
202     for (int64_t i = begin; i < begin + count; i++) {
203         Assets assets;
204         Asset asset = ASSET_COPY;
205         asset.name = ASSET_COPY.name + std::to_string(i);
206         assets.emplace_back(asset);
207         VBucket data;
208         data.insert_or_assign(COL_ASSET, asset);
209         asset.name = ASSET_COPY.name + std::to_string(i) + "_copy";
210         assets.emplace_back(asset);
211         data.insert_or_assign(COL_ID, i);
212         data.insert_or_assign(COL_NAME, "name" + std::to_string(g_nameId++));
213         data.insert_or_assign(COL_ASSETS, assets);
214         record.push_back(data);
215 
216         VBucket log;
217         Timestamp now = TimeHelper::GetSysCurrentTime();
218         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
219         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
220         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
221         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
222         extend.push_back(log);
223     }
224 }
225 
InsertLocalData(int64_t begin,int64_t count,const std::string & tableName,bool isAssetNull)226 void DistributedDBCloudSyncerLockTest::InsertLocalData(int64_t begin, int64_t count,
227     const std::string &tableName, bool isAssetNull)
228 {
229     int errCode;
230     std::vector<VBucket> record;
231     std::vector<VBucket> extend;
232     GenerateDataRecords(begin, count, 0, record, extend);
233     const string sql = "insert or replace into " + tableName + " values (?,?,?,?);";
234     for (VBucket vBucket : record) {
235         sqlite3_stmt *stmt = nullptr;
236         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
237         ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, std::get<int64_t>(vBucket[COL_ID])), E_OK); // 1 is id
238         ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, std::get<string>(vBucket[COL_NAME])), E_OK); // 2 is name
239         if (isAssetNull) {
240             ASSERT_EQ(sqlite3_bind_null(stmt, 3), SQLITE_OK); // 3 is asset
241         } else {
242             std::vector<uint8_t> assetBlob = g_virtualCloudDataTranslate->AssetToBlob(ASSET_COPY);
243             ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 3, assetBlob, false), E_OK); // 3 is asset
244         }
245         std::vector<uint8_t> assetsBlob = g_virtualCloudDataTranslate->AssetsToBlob(
246             std::get<Assets>(vBucket[COL_ASSETS]));
247         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 4, assetsBlob, false), E_OK); // 4 is assets
248         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
249         SQLiteUtils::ResetStatement(stmt, true, errCode);
250     }
251 }
252 
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const std::string & tableName)253 void DistributedDBCloudSyncerLockTest::InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
254     const std::string &tableName)
255 {
256     std::this_thread::sleep_for(std::chrono::milliseconds(1));
257     std::vector<VBucket> record;
258     std::vector<VBucket> extend;
259     GenerateDataRecords(begin, count, gidStart, record, extend);
260     ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(tableName, std::move(record), extend), DBStatus::OK);
261     std::this_thread::sleep_for(std::chrono::milliseconds(1));
262 }
263 
UpdateCloudDBData(int64_t begin,int64_t count,int64_t gidStart,int64_t versionStart,const std::string & tableName)264 void DistributedDBCloudSyncerLockTest::UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
265     int64_t versionStart, const std::string &tableName)
266 {
267     std::this_thread::sleep_for(std::chrono::milliseconds(1));
268     std::vector<VBucket> record;
269     std::vector<VBucket> extend;
270     GenerateDataRecords(begin, count, gidStart, record, extend);
271     for (auto &entry: extend) {
272         entry[CloudDbConstant::VERSION_FIELD] = std::to_string(versionStart++);
273     }
274     ASSERT_EQ(g_virtualCloudDb->BatchUpdate(tableName, std::move(record), extend), DBStatus::OK);
275     std::this_thread::sleep_for(std::chrono::milliseconds(1));
276 }
277 
DeleteCloudDBData(int64_t beginGid,int64_t count,const std::string & tableName)278 void DistributedDBCloudSyncerLockTest::DeleteCloudDBData(int64_t beginGid, int64_t count,
279     const std::string &tableName)
280 {
281     Timestamp now = TimeHelper::GetSysCurrentTime();
282     std::vector<VBucket> extend;
283     for (int64_t i = 0; i < count; ++i) {
284         VBucket log;
285         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
286         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
287         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(beginGid + i));
288         extend.push_back(log);
289     }
290     ASSERT_EQ(g_virtualCloudDb->BatchDelete(tableName, extend), DBStatus::OK);
291     std::this_thread::sleep_for(std::chrono::milliseconds(count));
292 }
293 
PrepareOption(const Query & query,LockAction action,bool isPriorityTask=false,bool isCompensatedSyncOnly=false)294 CloudSyncOption PrepareOption(const Query &query, LockAction action, bool isPriorityTask = false,
295     bool isCompensatedSyncOnly = false)
296 {
297     CloudSyncOption option;
298     option.devices = { "CLOUD" };
299     option.mode = SYNC_MODE_CLOUD_MERGE;
300     option.query = query;
301     option.waitTime = WAIT_TIME;
302     option.priorityTask = isPriorityTask;
303     option.compensatedSyncOnly = isCompensatedSyncOnly;
304     option.lockAction = action;
305     return option;
306 }
307 
CallSync(const CloudSyncOption & option,DBStatus expectResult)308 void DistributedDBCloudSyncerLockTest::CallSync(const CloudSyncOption &option, DBStatus expectResult)
309 {
310     std::mutex dataMutex;
311     std::condition_variable cv;
312     bool finish = false;
313     SyncProcess last;
314     auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
315         for (const auto &item: process) {
316             if (item.second.process == DistributedDB::FINISHED) {
317                 {
318                     std::lock_guard<std::mutex> autoLock(dataMutex);
319                     finish = true;
320                     last = item.second;
321                 }
322                 cv.notify_one();
323             }
324         }
325     };
326     ASSERT_EQ(g_delegate->Sync(option, callback), expectResult);
327     if (expectResult == OK) {
328         std::unique_lock<std::mutex> uniqueLock(dataMutex);
329         cv.wait(uniqueLock, [&finish]() {
330             return finish;
331         });
332     }
333     g_syncProcess = last;
334 }
335 
TestConflictSync001(bool isUpdate)336 void DistributedDBCloudSyncerLockTest::TestConflictSync001(bool isUpdate)
337 {
338     /**
339      * @tc.steps:step1. init data and sync
340      * @tc.expected: step1. return ok.
341      */
342     int cloudCount = 20;
343     int localCount = 10;
344     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
345     InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
346     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
347     CallSync(option);
348 
349     /**
350      * @tc.steps:step2. update local data to upload, and set hook before upload, operator cloud data which id is 1
351      * @tc.expected: step2. return ok.
352      */
353     std::string sql;
354     if (isUpdate) {
355         sql = "update " + ASSETS_TABLE_NAME + " set name = 'xxx' where id = 1;";
356     } else {
357         sql = "delete from " + ASSETS_TABLE_NAME + " where id = 1;";
358     }
359     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
360     int index = 0;
361     g_cloudStoreHook->SetDoUploadHook([&index, this]() {
362         if (++index == 1) {
363             UpdateCloudDBData(1, 1, 0, 21, ASSETS_TABLE_NAME); // 21 is version
364         }
365     });
366 
367     /**
368      * @tc.steps:step3. sync and check local data
369      * @tc.expected: step3. return ok.
370      */
371     CallSync(option);
372     sql = "select count(*) from " + ASSETS_TABLE_NAME + " where name = 'name30' AND id = '1';";
373     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
374         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
375 }
376 
CheckAssetStatusNormal()377 void DistributedDBCloudSyncerLockTest::CheckAssetStatusNormal()
378 {
379     std::string sql = "SELECT asset, assets FROM " + ASSETS_TABLE_NAME + ";";
380     sqlite3_stmt *stmt = nullptr;
381     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
382     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
383         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
384         ASSERT_EQ(sqlite3_column_type(stmt, 1), SQLITE_BLOB);
385         Type assetBlob;
386         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, assetBlob), E_OK);
387         Asset asset = g_virtualCloudDataTranslate->BlobToAsset(std::get<Bytes>(assetBlob));
388         EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
389         Type assetsBlob;
390         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, assetsBlob), E_OK);
391         Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(assetsBlob));
392         for (const auto &as : assets) {
393             EXPECT_EQ(as.status, static_cast<uint32_t>(AssetStatus::NORMAL));
394         }
395     }
396     int errCode = E_OK;
397     SQLiteUtils::ResetStatement(stmt, true, errCode);
398 }
399 
UpdateCloudAssets(Asset & asset,Assets & assets,const std::string & version)400 void DistributedDBCloudSyncerLockTest::UpdateCloudAssets(Asset &asset, Assets &assets, const std::string &version)
401 {
402     std::this_thread::sleep_for(std::chrono::milliseconds(1));
403     VBucket data;
404     std::vector<VBucket> record;
405     std::vector<VBucket> extend;
406     asset.name.empty() ? data.insert_or_assign(COL_ASSET, Nil()) : data.insert_or_assign(COL_ASSET, asset);
407     data.insert_or_assign(COL_ID, 0L);
408     data.insert_or_assign(COL_NAME, "name" + std::to_string(g_nameId++));
409     assets.empty() ? data.insert_or_assign(COL_ASSETS, Nil()) : data.insert_or_assign(COL_ASSETS, assets);
410     record.push_back(data);
411     VBucket log;
412     Timestamp now = TimeHelper::GetSysCurrentTime();
413     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
414     log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
415     log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
416     log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(0));
417     log.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
418     extend.push_back(log);
419     ASSERT_EQ(g_virtualCloudDb->BatchUpdate(ASSETS_TABLE_NAME, std::move(record), extend), DBStatus::OK);
420     std::this_thread::sleep_for(std::chrono::milliseconds(1));
421 }
422 
CheckUploadAbnormal(OpType opType,int64_t expCnt,bool isCompensated)423 void DistributedDBCloudSyncerLockTest::CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated)
424 {
425     std::string sql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE ";
426     switch (opType) {
427         case OpType::INSERT:
428             sql += isCompensated ? " cloud_gid != '' AND version !='' AND flag&0x10=0" :
429                    " cloud_gid != '' AND version !='' AND flag=flag|0x10";
430             break;
431         case OpType::UPDATE:
432             sql += isCompensated ? " cloud_gid != '' AND version !='' AND flag&0x10=0" :
433                    " cloud_gid == '' AND version =='' AND flag=flag|0x10";
434             break;
435         case OpType::DELETE:
436             sql += " cloud_gid == '' AND version ==''";
437             break;
438         default:
439             break;
440     }
441     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
442         reinterpret_cast<void *>(expCnt), nullptr), SQLITE_OK);
443 }
444 
445 /**
446  * @tc.name: RDBUnlockCloudSync001
447  * @tc.desc: Test sync with no lock
448  * @tc.type: FUNC
449  * @tc.require:
450  * @tc.author: bty
451  */
452 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBUnlockCloudSync001, TestSize.Level0)
453 {
454     /**
455      * @tc.steps:step1. init data and sync with none lock
456      * @tc.expected: step1. return ok.
457      */
458     int cloudCount = 20;
459     int localCount = 10;
460     InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
461     InsertCloudDBData(0, localCount, 0, ASSETS_TABLE_NAME);
462     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::NONE);
463     CallSync(option);
464 
465     /**
466      * @tc.steps:step2. insert or replace, check version
467      * @tc.expected: step2. return ok.
468      */
469     std::string sql = "INSERT OR REPLACE INTO " + ASSETS_TABLE_NAME + " VALUES('0', 'XX', '', '');";
470     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
471     sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
472         " where version != '' and version is not null;";
473     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
474         reinterpret_cast<void *>(cloudCount), nullptr), SQLITE_OK);
475 }
476 
477 /**
478  * @tc.name: RDBConflictCloudSync001
479  * @tc.desc: Both cloud and local are available, local version is empty, with cloud updates before upload
480  * @tc.type: FUNC
481  * @tc.require:
482  * @tc.author: bty
483  */
484 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync001, TestSize.Level0)
485 {
486     /**
487      * @tc.steps:step1. init data and set hook before upload, update cloud data which gid is 1
488      * @tc.expected: step1. return ok.
489      */
490     int cloudCount = 20;
491     int localCount = 10;
492     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
493     InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
494     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
495     int index = 0;
__anoncb3c72070502() 496     g_cloudStoreHook->SetDoUploadHook([&index, this]() {
497         if (++index == 1) {
498             UpdateCloudDBData(1, 1, 0, 1, ASSETS_TABLE_NAME);
499         }
500     });
501 
502     /**
503      * @tc.steps:step2. sync and check local data
504      * @tc.expected: step2. return ok.
505      */
506     CallSync(option);
507     std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
508         " where flag&0x02=0 AND version='20' AND cloud_gid = '1';";
509     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
510         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
511 }
512 
513 /**
514  * @tc.name: RDBConflictCloudSync002
515  * @tc.desc: Both cloud and local are available, with cloud updates before upload
516  * @tc.type: FUNC
517  * @tc.require:
518  * @tc.author: bty
519  */
520 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync002, TestSize.Level0)
521 {
522     TestConflictSync001(true);
523 }
524 
525 /**
526  * @tc.name: RDBConflictCloudSync003
527  * @tc.desc: Both cloud and local are available, with cloud deletes before upload
528  * @tc.type: FUNC
529  * @tc.require:
530  * @tc.author: bty
531  */
532 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync003, TestSize.Level0)
533 {
534     TestConflictSync001(false);
535 }
536 
537 /**
538  * @tc.name: RDBConflictCloudSync003
539  * @tc.desc: Both cloud and local are available, with cloud inserts before upload
540  * @tc.type: FUNC
541  * @tc.require:
542  * @tc.author: bty
543  */
544 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync004, TestSize.Level0)
545 {
546     /**
547      * @tc.steps:step1. init data and sync
548      * @tc.expected: step1. return ok.
549      */
550     int cloudCount = 20;
551     int localCount = 10;
552     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
553     InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
554     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
555     CallSync(option);
556 
557     /**
558      * @tc.steps:step2. insert local data and set hook before upload, insert cloud data which id is 20
559      * @tc.expected: step2. return ok.
560      */
561     std::string sql = "INSERT INTO " + ASSETS_TABLE_NAME + " VALUES('20', 'XXX', NULL, NULL);";
562     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
563     int index = 0;
__anoncb3c72070602() 564     g_cloudStoreHook->SetDoUploadHook([&index, cloudCount, this]() {
565         if (++index == 1) {
566             InsertCloudDBData(cloudCount, 1, cloudCount, ASSETS_TABLE_NAME);
567         }
568     });
569 
570     /**
571      * @tc.steps:step3. set hook for batch insert, return CLOUD_VERSION_CONFLICT err
572      * @tc.expected: step3. return ok.
573      */
574     g_virtualCloudDb->ForkInsertConflict([](const std::string &tableName, VBucket &extend, VBucket &record,
__anoncb3c72070702(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 575         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
576         for (auto &[cloudRecord, cloudExtend]: cloudDataVec) {
577             int64_t cloudPk;
578             CloudStorageUtils::GetValueFromVBucket<int64_t>(COL_ID, record, cloudPk);
579             int64_t localPk;
580             CloudStorageUtils::GetValueFromVBucket<int64_t>(COL_ID, cloudRecord, localPk);
581             if (cloudPk != localPk) {
582                 continue;
583             }
584             std::string localVersion;
585             CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, extend, localVersion);
586             std::string cloudVersion;
587             CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, cloudExtend,
588                 cloudVersion);
589             if (localVersion != cloudVersion) {
590                 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
591                 return CLOUD_VERSION_CONFLICT;
592             }
593         }
594         return OK;
595     });
596 
597     /**
598      * @tc.steps:step3. sync and check local data
599      * @tc.expected: step3. return ok.
600      */
601     CallSync(option);
602     sql = "select count(*) from " + ASSETS_TABLE_NAME + " where name = 'name30' AND id = '20';";
603     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
604         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
605     for (const auto &table : g_syncProcess.tableProcess) {
606         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
607     }
608 }
609 
610 /**
611  * @tc.name: QueryCursorTest001
612  * @tc.desc: Test cursor after querying no data
613  * @tc.type: FUNC
614  * @tc.require:
615  * @tc.author: bty
616  */
617 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest001, TestSize.Level0)
618 {
619     /**
620      * @tc.steps:step1. init data and Query with cursor tha exceeds range
621      * @tc.expected: step1. return ok.
622      */
623     int cloudCount = 20;
624     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
625     VBucket extend;
626     extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(30);
627     std::vector<VBucket> data;
628 
629     /**
630      * @tc.steps:step2. check cursor output param
631      * @tc.expected: step2. return QUERY_END.
632      */
633     EXPECT_EQ(g_virtualCloudDb->Query(ASSETS_TABLE_NAME, extend, data), QUERY_END);
634     EXPECT_EQ(std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]), std::to_string(cloudCount));
635 }
636 
637 /**
638  * @tc.name: QueryCursorTest002
639  * @tc.desc: Test cursor in conditional query sync
640  * @tc.type: FUNC
641  * @tc.require:
642  * @tc.author: bty
643  */
644 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest002, TestSize.Level0)
645 {
646     /**
647      * @tc.steps:step1. init data
648      * @tc.expected: step1. return ok.
649      */
650     int count = 10;
651     InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
652     InsertLocalData(0, count, ASSETS_TABLE_NAME, true);
653     std::vector<int> idVec = {2, 3};
654     CloudSyncOption option = PrepareOption(Query::Select().From(ASSETS_TABLE_NAME).In("id", idVec),
655         LockAction::DOWNLOAD, true);
656     int index = 0;
657 
658     /**
659      * @tc.steps:step2. sync and check cursor
660      * @tc.expected: step2. return ok.
661      */
__anoncb3c72070802(const std::string &, VBucket &extend) 662     g_virtualCloudDb->ForkQuery([&index](const std::string &, VBucket &extend) {
663         if (index == 1) {
664             std::string cursor;
665             CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CURSOR_FIELD, extend, cursor);
666             EXPECT_EQ(cursor, std::string(""));
667         }
668         index++;
669     });
670     CallSync(option);
671 }
672 
673 /**
674  * @tc.name: RecordConflictTest001
675  * @tc.desc: Test the asset input param after download return CLOUD_RECORD_EXIST_CONFLICT
676  * @tc.type: FUNC
677  * @tc.require:
678  * @tc.author: bty
679  */
680 HWTEST_F(DistributedDBCloudSyncerLockTest, RecordConflictTest001, TestSize.Level0)
681 {
682     /**
683      * @tc.steps:step1. init data and sync
684      * @tc.expected: step1. return ok.
685      */
686     int count = 10;
687     InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
688     g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
689     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
690     int callCount = 0;
__anoncb3c72070902() 691     g_cloudStoreHook->SetSyncFinishHook([&callCount]() {
692         callCount++;
693         g_processCondition.notify_all();
694     });
695     CallSync(option);
696     {
697         std::unique_lock<std::mutex> lock(g_processMutex);
698         bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anoncb3c72070a02() 699             [&callCount]() { return callCount == 2; }); // 2 is compensated sync
700         ASSERT_EQ(result, true);
701     }
702 
703     /**
704      * @tc.steps:step2. sync again and check asset
705      * @tc.expected: step2. return ok.
706      */
707     g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
__anoncb3c72070b02(std::map<std::string, Assets> &assets) 708     g_virtualAssetLoader->ForkDownload([](std::map<std::string, Assets> &assets) {
709         EXPECT_EQ(assets.find(COL_ASSET) != assets.end(), true);
710     });
711     CallSync(option);
712     {
713         std::unique_lock<std::mutex> lock(g_processMutex);
714         bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anoncb3c72070c02() 715             [&callCount]() { return callCount == 4; }); // 4 is compensated sync
716         ASSERT_EQ(result, true);
717     }
718     g_cloudStoreHook->SetSyncFinishHook(nullptr);
719 }
720 
721 /**
722  * @tc.name: QueryCursorTest003
723  * @tc.desc: Test whether cursor fallback
724  * @tc.type: FUNC
725  * @tc.require:
726  * @tc.author: bty
727  */
728 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest003, TestSize.Level0)
729 {
730     /**
731      * @tc.steps:step1. init cloud data and sync
732      * @tc.expected: step1. return ok.
733      */
734     int cloudCount = 10;
735     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
736     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
737     CallSync(option);
738 
739     /**
740      * @tc.steps:step2. delete cloud data and sync
741      * @tc.expected: step2. return ok.
742      */
743     DeleteCloudDBData(0, cloudCount, ASSETS_TABLE_NAME);
744     CallSync(option);
745 
746     /**
747      * @tc.steps:step3. remove data
748      * @tc.expected: step3. return ok.
749      */
750     std::string device = "";
751     ASSERT_EQ(g_delegate->RemoveDeviceData(device, DistributedDB::FLAG_ONLY), DBStatus::OK);
752 
753     /**
754      * @tc.steps:step4. insert local and check cursor
755      * @tc.expected: step4. return ok.
756      */
757     InsertLocalData(0, 1, ASSETS_TABLE_NAME, true);
758     std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
759         " where cursor='31';";
760     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
761         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
762 }
763 
764 /**
765  * @tc.name: QueryCursorTest004
766  * @tc.desc: Test temp trigger under concurrency
767  * @tc.type: FUNC
768  * @tc.require:
769  * @tc.author: bty
770  */
771 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest004, TestSize.Level0)
772 {
773     /**
774      * @tc.steps:step1. init cloud data
775      * @tc.expected: step1. return ok.
776      */
777     int cloudCount = 10;
778     InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
779     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
780 
781     /**
782      * @tc.steps:step2. set tracker table before saving cloud data
783      * @tc.expected: step2. return ok.
784      */
__anoncb3c72070d02(const std::string &table, VBucket &) 785     g_virtualCloudDb->ForkQuery([](const std::string &table, VBucket &) {
786         TrackerSchema schema = {
787             .tableName = ASSETS_TABLE_NAME, .extendColName = COL_NAME, .trackerColNames = { COL_ID }
788         };
789         EXPECT_EQ(g_delegate->SetTrackerTable(schema), WITH_INVENTORY_DATA);
790     });
791     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
792     CallSync(option);
793 
794     /**
795      * @tc.steps:step3. check extend_field and cursor
796      * @tc.expected: step3. return ok.
797      */
798     std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
799         " where data_key='0' and extend_field='name10' and cursor='31';";
800     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
801         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
802 }
803 } // namespace
804 #endif // RELATIONAL_STORE