• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "cloud/asset_operation_utils.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "relational_store_client.h"
24 #include "relational_store_instance.h"
25 #include "relational_store_manager.h"
26 #include "runtime_config.h"
27 #include "sqlite_relational_store.h"
28 #include "sqlite_relational_utils.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "virtual_communicator_aggregator.h"
34 #include <gtest/gtest.h>
35 #include <iostream>
36 
37 using namespace testing::ext;
38 using namespace DistributedDB;
39 using namespace DistributedDBUnitTest;
40 using namespace std;
41 
42 namespace {
43 const string STORE_ID = "Relational_Store_Lock_Sync";
44 const string DB_SUFFIX = ".db";
45 const string ASSETS_TABLE_NAME = "student";
46 const string ASSETS_TABLE_NAME_SHARED = "student_shared";
47 const string DEVICE_CLOUD = "cloud_dev";
48 const string COL_ID = "id";
49 const string COL_NAME = "name";
50 const string COL_HEIGHT = "height";
51 const string COL_ASSET = "asset";
52 const string COL_ASSETS = "assets";
53 const string COL_AGE = "age";
54 const int64_t WAIT_TIME = 5;
55 const std::vector<Field> CLOUD_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true}, {COL_NAME, TYPE_INDEX<std::string>},
56     {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>}};
57 const string CREATE_SINGLE_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + ASSETS_TABLE_NAME + "(" + COL_ID +
58     " INTEGER PRIMARY KEY," + COL_NAME + " TEXT ," + COL_ASSET + " ASSET," + COL_ASSETS + " ASSETS" + ");";
59 const Asset ASSET_COPY = {.version = 1,
60     .name = "Phone",
61     .assetId = "0",
62     .subpath = "/local/sync",
63     .uri = "/local/sync",
64     .modifyTime = "123456",
65     .createTime = "",
66     .size = "256",
67     .hash = "ASE"};
68 const Assets ASSETS_COPY1 = { ASSET_COPY };
69 const string ASSET_SUFFIX = "_copy";
70 
71 string g_storePath;
72 string g_testDir;
73 RelationalStoreObserverUnitTest *g_observer = nullptr;
74 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
75 RelationalStoreDelegate *g_delegate = nullptr;
76 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
77 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
78 std::shared_ptr<VirtualCloudDataTranslate> g_virtualCloudDataTranslate;
79 SyncProcess g_syncProcess;
80 std::condition_variable g_processCondition;
81 std::mutex g_processMutex;
82 IRelationalStore *g_store = nullptr;
83 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
84 int64_t g_nameId;
85 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
86 
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)87 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
88 {
89     TableSchema assetsTableSchema = {.name = ASSETS_TABLE_NAME, .sharedTableName = ASSETS_TABLE_NAME_SHARED,
90         .fields = CLOUD_FIELDS};
91     dataBaseSchema.tables.push_back(assetsTableSchema);
92 }
93 
CloseDb()94 void CloseDb()
95 {
96     if (g_delegate != nullptr) {
97         EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
98         g_delegate = nullptr;
99     }
100     if (g_observer != nullptr) {
101         delete g_observer;
102         g_observer = nullptr;
103     }
104     g_virtualCloudDb = nullptr;
105 }
106 
107 class DistributedDBCloudSyncerLockTest : public testing::Test {
108 public:
109     static void SetUpTestCase(void);
110     static void TearDownTestCase(void);
111     void SetUp();
112     void TearDown();
113 
114 protected:
115     void Init();
116     const RelationalSyncAbleStorage *GetRelationalStore();
117     void InsertLocalData(int64_t begin, int64_t count, const std::string &tableName, bool isAssetNull = true);
118     void GenerateDataRecords(
119         int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend);
120     void InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart, const std::string &tableName);
121     void UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart, int64_t versionStart,
122         const std::string &tableName);
123     void DeleteCloudDBData(int64_t beginGid, int64_t count, const std::string &tableName);
124     void CallSync(const CloudSyncOption &option, DBStatus expectResult = OK);
125 
126     void TestConflictSync001(bool isUpdate);
127     void CheckAssetStatusNormal();
128     void UpdateCloudAssets(Asset &asset, Assets &assets, const std::string &version);
129     void CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated = false);
130     sqlite3 *db = nullptr;
131     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
132 };
133 
SetUpTestCase(void)134 void DistributedDBCloudSyncerLockTest::SetUpTestCase(void)
135 {
136     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
137     g_storePath = g_testDir + "/" + STORE_ID + DB_SUFFIX;
138     LOGI("The test db is:%s", g_storePath.c_str());
139     g_virtualCloudDataTranslate = std::make_shared<VirtualCloudDataTranslate>();
140     RuntimeConfig::SetCloudTranslate(g_virtualCloudDataTranslate);
141 }
142 
TearDownTestCase(void)143 void DistributedDBCloudSyncerLockTest::TearDownTestCase(void) {}
144 
SetUp(void)145 void DistributedDBCloudSyncerLockTest::SetUp(void)
146 {
147     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
148     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
149         LOGE("rm test db files error.");
150     }
151     DistributedDBToolsUnitTest::PrintTestCaseInfo();
152     LOGD("Test dir is %s", g_testDir.c_str());
153     Init();
154     g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
155     ASSERT_NE(g_cloudStoreHook, nullptr);
156     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
157     ASSERT_TRUE(communicatorAggregator_ != nullptr);
158     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
159 }
160 
TearDown(void)161 void DistributedDBCloudSyncerLockTest::TearDown(void)
162 {
163     RefObject::DecObjRef(g_store);
164     g_virtualCloudDb->ForkUpload(nullptr);
165     CloseDb();
166     EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
167     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
168         LOGE("rm test db files error.");
169     }
170     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
171     communicatorAggregator_ = nullptr;
172     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
173 }
174 
Init()175 void DistributedDBCloudSyncerLockTest::Init()
176 {
177     db = RelationalTestUtils::CreateDataBase(g_storePath);
178     ASSERT_NE(db, nullptr);
179     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
180     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_SINGLE_PRIMARY_KEY_TABLE), SQLITE_OK);
181     g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
182     ASSERT_NE(g_observer, nullptr);
183     ASSERT_EQ(g_mgr.OpenStore(g_storePath, STORE_ID, RelationalStoreDelegate::Option{.observer = g_observer},
184         g_delegate), DBStatus::OK);
185     ASSERT_NE(g_delegate, nullptr);
186     ASSERT_EQ(g_delegate->CreateDistributedTable(ASSETS_TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
187     g_virtualCloudDb = make_shared<VirtualCloudDb>();
188     g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
189     g_syncProcess = {};
190     ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
191     ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
192     DataBaseSchema dataBaseSchema;
193     GetCloudDbSchema(dataBaseSchema);
194     ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
195     g_nameId = 0;
196 }
197 
GetRelationalStore()198 const RelationalSyncAbleStorage* DistributedDBCloudSyncerLockTest::GetRelationalStore()
199 {
200     RelationalDBProperties properties;
201     CloudDBSyncUtilsTest::InitStoreProp(g_storePath, APP_ID, USER_ID, STORE_ID, properties);
202     int errCode = E_OK;
203     g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
204     if (g_store == nullptr) {
205         return nullptr;
206     }
207     return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
208 }
209 
210 
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,std::vector<VBucket> & record,std::vector<VBucket> & extend)211 void DistributedDBCloudSyncerLockTest::GenerateDataRecords(
212     int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend)
213 {
214     for (int64_t i = begin; i < begin + count; i++) {
215         Assets assets;
216         Asset asset = ASSET_COPY;
217         asset.name = ASSET_COPY.name + std::to_string(i);
218         assets.emplace_back(asset);
219         VBucket data;
220         data.insert_or_assign(COL_ASSET, asset);
221         asset.name = ASSET_COPY.name + std::to_string(i) + "_copy";
222         assets.emplace_back(asset);
223         data.insert_or_assign(COL_ID, i);
224         data.insert_or_assign(COL_NAME, "name" + std::to_string(g_nameId++));
225         data.insert_or_assign(COL_ASSETS, assets);
226         record.push_back(data);
227 
228         VBucket log;
229         Timestamp now = TimeHelper::GetSysCurrentTime();
230         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
231         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
232         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
233         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
234         extend.push_back(log);
235     }
236 }
237 
InsertLocalData(int64_t begin,int64_t count,const std::string & tableName,bool isAssetNull)238 void DistributedDBCloudSyncerLockTest::InsertLocalData(int64_t begin, int64_t count,
239     const std::string &tableName, bool isAssetNull)
240 {
241     int errCode;
242     std::vector<VBucket> record;
243     std::vector<VBucket> extend;
244     GenerateDataRecords(begin, count, 0, record, extend);
245     const string sql = "insert or replace into " + tableName + " values (?,?,?,?);";
246     for (VBucket vBucket : record) {
247         sqlite3_stmt *stmt = nullptr;
248         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
249         ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, std::get<int64_t>(vBucket[COL_ID])), E_OK); // 1 is id
250         ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, std::get<string>(vBucket[COL_NAME])), E_OK); // 2 is name
251         if (isAssetNull) {
252             ASSERT_EQ(sqlite3_bind_null(stmt, 3), SQLITE_OK); // 3 is asset
253         } else {
254             std::vector<uint8_t> assetBlob = g_virtualCloudDataTranslate->AssetToBlob(ASSET_COPY);
255             ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 3, assetBlob, false), E_OK); // 3 is asset
256         }
257         std::vector<uint8_t> assetsBlob = g_virtualCloudDataTranslate->AssetsToBlob(
258             std::get<Assets>(vBucket[COL_ASSETS]));
259         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 4, assetsBlob, false), E_OK); // 4 is assets
260         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
261         SQLiteUtils::ResetStatement(stmt, true, errCode);
262     }
263 }
264 
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const std::string & tableName)265 void DistributedDBCloudSyncerLockTest::InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
266     const std::string &tableName)
267 {
268     std::this_thread::sleep_for(std::chrono::milliseconds(1));
269     std::vector<VBucket> record;
270     std::vector<VBucket> extend;
271     GenerateDataRecords(begin, count, gidStart, record, extend);
272     ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(tableName, std::move(record), extend), DBStatus::OK);
273     std::this_thread::sleep_for(std::chrono::milliseconds(1));
274 }
275 
UpdateCloudDBData(int64_t begin,int64_t count,int64_t gidStart,int64_t versionStart,const std::string & tableName)276 void DistributedDBCloudSyncerLockTest::UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
277     int64_t versionStart, const std::string &tableName)
278 {
279     std::this_thread::sleep_for(std::chrono::milliseconds(1));
280     std::vector<VBucket> record;
281     std::vector<VBucket> extend;
282     GenerateDataRecords(begin, count, gidStart, record, extend);
283     for (auto &entry: extend) {
284         entry[CloudDbConstant::VERSION_FIELD] = std::to_string(versionStart++);
285     }
286     ASSERT_EQ(g_virtualCloudDb->BatchUpdate(tableName, std::move(record), extend), DBStatus::OK);
287     std::this_thread::sleep_for(std::chrono::milliseconds(1));
288 }
289 
DeleteCloudDBData(int64_t beginGid,int64_t count,const std::string & tableName)290 void DistributedDBCloudSyncerLockTest::DeleteCloudDBData(int64_t beginGid, int64_t count,
291     const std::string &tableName)
292 {
293     Timestamp now = TimeHelper::GetSysCurrentTime();
294     std::vector<VBucket> extend;
295     for (int64_t i = 0; i < count; ++i) {
296         VBucket log;
297         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
298         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
299         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(beginGid + i));
300         extend.push_back(log);
301     }
302     ASSERT_EQ(g_virtualCloudDb->BatchDelete(tableName, extend), DBStatus::OK);
303     std::this_thread::sleep_for(std::chrono::milliseconds(count));
304 }
305 
PrepareOption(const Query & query,LockAction action,bool isPriorityTask=false,bool isCompensatedSyncOnly=false)306 CloudSyncOption PrepareOption(const Query &query, LockAction action, bool isPriorityTask = false,
307     bool isCompensatedSyncOnly = false)
308 {
309     CloudSyncOption option;
310     option.devices = { "CLOUD" };
311     option.mode = SYNC_MODE_CLOUD_MERGE;
312     option.query = query;
313     option.waitTime = WAIT_TIME;
314     option.priorityTask = isPriorityTask;
315     option.compensatedSyncOnly = isCompensatedSyncOnly;
316     option.lockAction = action;
317     return option;
318 }
319 
CallSync(const CloudSyncOption & option,DBStatus expectResult)320 void DistributedDBCloudSyncerLockTest::CallSync(const CloudSyncOption &option, DBStatus expectResult)
321 {
322     std::mutex dataMutex;
323     std::condition_variable cv;
324     bool finish = false;
325     SyncProcess last;
326     auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
327         for (const auto &item: process) {
328             if (item.second.process == DistributedDB::FINISHED) {
329                 {
330                     std::lock_guard<std::mutex> autoLock(dataMutex);
331                     finish = true;
332                     last = item.second;
333                 }
334                 cv.notify_one();
335             }
336         }
337     };
338     ASSERT_EQ(g_delegate->Sync(option, callback), expectResult);
339     if (expectResult == OK) {
340         std::unique_lock<std::mutex> uniqueLock(dataMutex);
341         cv.wait(uniqueLock, [&finish]() {
342             return finish;
343         });
344     }
345     g_syncProcess = last;
346 }
347 
TestConflictSync001(bool isUpdate)348 void DistributedDBCloudSyncerLockTest::TestConflictSync001(bool isUpdate)
349 {
350     /**
351      * @tc.steps:step1. init data and sync
352      * @tc.expected: step1. return ok.
353      */
354     int cloudCount = 20;
355     int localCount = 10;
356     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
357     InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
358     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
359     CallSync(option);
360 
361     /**
362      * @tc.steps:step2. update local data to upload, and set hook before upload, operator cloud data which id is 1
363      * @tc.expected: step2. return ok.
364      */
365     std::string sql;
366     if (isUpdate) {
367         sql = "update " + ASSETS_TABLE_NAME + " set name = 'xxx' where id = 1;";
368     } else {
369         sql = "delete from " + ASSETS_TABLE_NAME + " where id = 1;";
370     }
371     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
372     int index = 0;
373     g_cloudStoreHook->SetDoUploadHook([&index, this]() {
374         if (++index == 1) {
375             UpdateCloudDBData(1, 1, 0, 21, ASSETS_TABLE_NAME); // 21 is version
376         }
377     });
378 
379     /**
380      * @tc.steps:step3. sync and check local data
381      * @tc.expected: step3. return ok.
382      */
383     CallSync(option);
384     sql = "select count(*) from " + ASSETS_TABLE_NAME + " where name = 'name30' AND id = '1';";
385     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
386         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
387 }
388 
CheckAssetStatusNormal()389 void DistributedDBCloudSyncerLockTest::CheckAssetStatusNormal()
390 {
391     std::string sql = "SELECT asset, assets FROM " + ASSETS_TABLE_NAME + ";";
392     sqlite3_stmt *stmt = nullptr;
393     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
394     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
395         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
396         ASSERT_EQ(sqlite3_column_type(stmt, 1), SQLITE_BLOB);
397         Type assetBlob;
398         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, assetBlob), E_OK);
399         Asset asset = g_virtualCloudDataTranslate->BlobToAsset(std::get<Bytes>(assetBlob));
400         EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
401         Type assetsBlob;
402         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, assetsBlob), E_OK);
403         Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(assetsBlob));
404         for (const auto &as : assets) {
405             EXPECT_EQ(as.status, static_cast<uint32_t>(AssetStatus::NORMAL));
406         }
407     }
408     int errCode = E_OK;
409     SQLiteUtils::ResetStatement(stmt, true, errCode);
410 }
411 
UpdateCloudAssets(Asset & asset,Assets & assets,const std::string & version)412 void DistributedDBCloudSyncerLockTest::UpdateCloudAssets(Asset &asset, Assets &assets, const std::string &version)
413 {
414     std::this_thread::sleep_for(std::chrono::milliseconds(1));
415     VBucket data;
416     std::vector<VBucket> record;
417     std::vector<VBucket> extend;
418     asset.name.empty() ? data.insert_or_assign(COL_ASSET, Nil()) : data.insert_or_assign(COL_ASSET, asset);
419     data.insert_or_assign(COL_ID, 0L);
420     data.insert_or_assign(COL_NAME, "name" + std::to_string(g_nameId++));
421     assets.empty() ? data.insert_or_assign(COL_ASSETS, Nil()) : data.insert_or_assign(COL_ASSETS, assets);
422     record.push_back(data);
423     VBucket log;
424     Timestamp now = TimeHelper::GetSysCurrentTime();
425     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
426     log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
427     log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
428     log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(0));
429     log.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
430     extend.push_back(log);
431     ASSERT_EQ(g_virtualCloudDb->BatchUpdate(ASSETS_TABLE_NAME, std::move(record), extend), DBStatus::OK);
432     std::this_thread::sleep_for(std::chrono::milliseconds(1));
433 }
434 
CheckUploadAbnormal(OpType opType,int64_t expCnt,bool isCompensated)435 void DistributedDBCloudSyncerLockTest::CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated)
436 {
437     std::string sql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE ";
438     switch (opType) {
439         case OpType::INSERT:
440             sql += isCompensated ? " cloud_gid != '' AND version !='' AND flag&0x10=0" :
441                    " cloud_gid != '' AND version !='' AND flag=flag|0x10";
442             break;
443         case OpType::UPDATE:
444             sql += isCompensated ? " cloud_gid != '' AND version !='' AND flag&0x10=0" :
445                    " cloud_gid == '' AND version =='' AND flag=flag|0x10";
446             break;
447         case OpType::DELETE:
448             sql += " cloud_gid == '' AND version ==''";
449             break;
450         default:
451             break;
452     }
453     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
454         reinterpret_cast<void *>(expCnt), nullptr), SQLITE_OK);
455 }
456 
457 /**
458  * @tc.name: RDBUnlockCloudSync001
459  * @tc.desc: Test sync with no lock
460  * @tc.type: FUNC
461  * @tc.require:
462  * @tc.author: bty
463  */
464 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBUnlockCloudSync001, TestSize.Level0)
465 {
466     /**
467      * @tc.steps:step1. init data and sync with none lock
468      * @tc.expected: step1. return ok.
469      */
470     int cloudCount = 20;
471     int localCount = 10;
472     InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
473     InsertCloudDBData(0, localCount, 0, ASSETS_TABLE_NAME);
474     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::NONE);
475     CallSync(option);
476 
477     /**
478      * @tc.steps:step2. insert or replace, check version
479      * @tc.expected: step2. return ok.
480      */
481     std::string sql = "INSERT OR REPLACE INTO " + ASSETS_TABLE_NAME + " VALUES('0', 'XX', '', '');";
482     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
483     sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
484         " where version != '' and version is not null;";
485     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
486         reinterpret_cast<void *>(cloudCount), nullptr), SQLITE_OK);
487 }
488 
489 /**
490  * @tc.name: RDBLockSyncTest001
491  * @tc.desc: Test sync deleted data which status is LOCKING.
492  * @tc.type: FUNC
493  * @tc.require:
494  * @tc.author: liaoyonghuang
495  */
496 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBLockSyncTest001, TestSize.Level0)
497 {
498     /**
499      * @tc.steps:step1. init deleted data which status is LOCKING.
500      * @tc.expected: step1. return ok.
501      */
502     int dataCount = 10;
503     InsertLocalData(0, dataCount, ASSETS_TABLE_NAME, true);
504     CloudSyncOption option1 = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
505     CallSync(option1);
506     std::vector<std::vector<uint8_t>> hashKeys;
507     CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0", db, hashKeys);
508     EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKeys, db), OK);
509     std::string sql = "delete from " + ASSETS_TABLE_NAME + " where _rowid_ = 0";
510     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
511     /**
512      * @tc.steps:step2. sync, check upload info
513      * @tc.expected: step2. return ok.
514      */
515     CloudSyncOption option2 = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT,
516         false, true);
517     CallSync(option2);
518     for (const auto &table : g_syncProcess.tableProcess) {
519         EXPECT_TRUE(table.second.upLoadInfo.successCount != 0u);
520     }
521 }
522 
523 /**
524  * @tc.name: RDBConflictCloudSync001
525  * @tc.desc: Both cloud and local are available, local version is empty, with cloud updates before upload
526  * @tc.type: FUNC
527  * @tc.require:
528  * @tc.author: bty
529  */
530 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync001, TestSize.Level0)
531 {
532     /**
533      * @tc.steps:step1. init data and set hook before upload, update cloud data which gid is 1
534      * @tc.expected: step1. return ok.
535      */
536     int cloudCount = 20;
537     int localCount = 10;
538     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
539     InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
540     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
541     int index = 0;
__anoncaa136470502() 542     g_cloudStoreHook->SetDoUploadHook([&index, this]() {
543         if (++index == 1) {
544             UpdateCloudDBData(1, 1, 0, 1, ASSETS_TABLE_NAME);
545         }
546     });
547 
548     /**
549      * @tc.steps:step2. sync and check local data
550      * @tc.expected: step2. return ok.
551      */
552     CallSync(option);
553     std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
554         " where flag&0x02=0 AND version='20' AND cloud_gid = '1';";
555     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
556         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
557 }
558 
559 /**
560  * @tc.name: RDBConflictCloudSync002
561  * @tc.desc: Both cloud and local are available, with cloud updates before upload
562  * @tc.type: FUNC
563  * @tc.require:
564  * @tc.author: bty
565  */
566 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync002, TestSize.Level0)
567 {
568     TestConflictSync001(true);
569 }
570 
571 /**
572  * @tc.name: RDBConflictCloudSync003
573  * @tc.desc: Both cloud and local are available, with cloud deletes before upload
574  * @tc.type: FUNC
575  * @tc.require:
576  * @tc.author: bty
577  */
578 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync003, TestSize.Level0)
579 {
580     TestConflictSync001(false);
581 }
582 
583 /**
584  * @tc.name: RDBConflictCloudSync003
585  * @tc.desc: Both cloud and local are available, with cloud inserts before upload
586  * @tc.type: FUNC
587  * @tc.require:
588  * @tc.author: bty
589  */
590 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync004, TestSize.Level0)
591 {
592     /**
593      * @tc.steps:step1. init data and sync
594      * @tc.expected: step1. return ok.
595      */
596     int cloudCount = 20;
597     int localCount = 10;
598     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
599     InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
600     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
601     CallSync(option);
602 
603     /**
604      * @tc.steps:step2. insert local data and set hook before upload, insert cloud data which id is 20
605      * @tc.expected: step2. return ok.
606      */
607     std::string sql = "INSERT INTO " + ASSETS_TABLE_NAME + " VALUES('20', 'XXX', NULL, NULL);";
608     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
609     int index = 0;
__anoncaa136470602() 610     g_cloudStoreHook->SetDoUploadHook([&index, cloudCount, this]() {
611         if (++index == 1) {
612             InsertCloudDBData(cloudCount, 1, cloudCount, ASSETS_TABLE_NAME);
613         }
614     });
615 
616     /**
617      * @tc.steps:step3. set hook for batch insert, return CLOUD_VERSION_CONFLICT err
618      * @tc.expected: step3. return ok.
619      */
620     g_virtualCloudDb->ForkInsertConflict([](const std::string &tableName, VBucket &extend, VBucket &record,
__anoncaa136470702(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 621         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
622         for (auto &[cloudRecord, cloudExtend]: cloudDataVec) {
623             int64_t cloudPk;
624             CloudStorageUtils::GetValueFromVBucket<int64_t>(COL_ID, record, cloudPk);
625             int64_t localPk;
626             CloudStorageUtils::GetValueFromVBucket<int64_t>(COL_ID, cloudRecord, localPk);
627             if (cloudPk != localPk) {
628                 continue;
629             }
630             std::string localVersion;
631             CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, extend, localVersion);
632             std::string cloudVersion;
633             CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, cloudExtend,
634                 cloudVersion);
635             if (localVersion != cloudVersion) {
636                 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
637                 return CLOUD_VERSION_CONFLICT;
638             }
639         }
640         return OK;
641     });
642 
643     /**
644      * @tc.steps:step3. sync and check local data
645      * @tc.expected: step3. return ok.
646      */
647     CallSync(option);
648     sql = "select count(*) from " + ASSETS_TABLE_NAME + " where name = 'name30' AND id = '20';";
649     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
650         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
651     for (const auto &table : g_syncProcess.tableProcess) {
652         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
653     }
654 }
655 
656 /**
657  * @tc.name: QueryCursorTest001
658  * @tc.desc: Test cursor after querying no data
659  * @tc.type: FUNC
660  * @tc.require:
661  * @tc.author: bty
662  */
663 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest001, TestSize.Level0)
664 {
665     /**
666      * @tc.steps:step1. init data and Query with cursor tha exceeds range
667      * @tc.expected: step1. return ok.
668      */
669     int cloudCount = 20;
670     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
671     VBucket extend;
672     extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(30);
673     std::vector<VBucket> data;
674 
675     /**
676      * @tc.steps:step2. check cursor output param
677      * @tc.expected: step2. return QUERY_END.
678      */
679     EXPECT_EQ(g_virtualCloudDb->Query(ASSETS_TABLE_NAME, extend, data), QUERY_END);
680     EXPECT_EQ(std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]), std::to_string(cloudCount));
681 }
682 
683 /**
684  * @tc.name: QueryCursorTest002
685  * @tc.desc: Test cursor in conditional query sync
686  * @tc.type: FUNC
687  * @tc.require:
688  * @tc.author: bty
689  */
690 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest002, TestSize.Level0)
691 {
692     /**
693      * @tc.steps:step1. init data
694      * @tc.expected: step1. return ok.
695      */
696     int count = 10;
697     InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
698     InsertLocalData(0, count, ASSETS_TABLE_NAME, true);
699     std::vector<int> idVec = {2, 3};
700     CloudSyncOption option = PrepareOption(Query::Select().From(ASSETS_TABLE_NAME).In("id", idVec),
701         LockAction::DOWNLOAD, true);
702     int index = 0;
703 
704     /**
705      * @tc.steps:step2. sync and check cursor
706      * @tc.expected: step2. return ok.
707      */
__anoncaa136470802(const std::string &, VBucket &extend) 708     g_virtualCloudDb->ForkQuery([&index](const std::string &, VBucket &extend) {
709         if (index == 1) {
710             std::string cursor;
711             CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CURSOR_FIELD, extend, cursor);
712             EXPECT_EQ(cursor, std::string(""));
713         }
714         index++;
715     });
716     CallSync(option);
717 }
718 
719 /**
720  * @tc.name: DownloadAssetStatusTest001
721  * @tc.desc: Test download assets status for INSERT
722  * @tc.type: FUNC
723  * @tc.require:
724  * @tc.author: bty
725  */
726 HWTEST_F(DistributedDBCloudSyncerLockTest, DownloadAssetStatusTest001, TestSize.Level0)
727 {
728     /**
729      * @tc.steps:step1. init cloud assert {a, b1, b2}
730      * @tc.expected: step1. return ok.
731      */
732     int count = 1;
733     InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
734     /**
735      * @tc.steps:step2. sync
736      * @tc.expected: step2. assets status is INSERT before download.
737      */
__anoncaa136470902(const std::string &tableName, std::map<std::string, Assets> &assets) 738     g_virtualAssetLoader->ForkDownload([](const std::string &tableName, std::map<std::string, Assets> &assets) {
739         for (const auto &item: assets) {
740             for (const auto &asset: item.second) {
741                 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::INSERT));
742             }
743         }
744     });
745     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
746     CallSync(option);
747     CheckAssetStatusNormal();
748     g_virtualAssetLoader->ForkDownload(nullptr);
749 }
750 
751 /**
752  * @tc.name: DownloadAssetStatusTest002
753  * @tc.desc: Test download assets status for DELETE
754  * @tc.type: FUNC
755  * @tc.require:
756  * @tc.author: bty
757  */
758 HWTEST_F(DistributedDBCloudSyncerLockTest, DownloadAssetStatusTest002, TestSize.Level0)
759 {
760     /**
761      * @tc.steps:step1. init cloud assert {a, b1, b2} and sync to local
762      * @tc.expected: step1. return ok.
763      */
764     int count = 1;
765     InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
766     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
767     CallSync(option);
768 
769     /**
770      * @tc.steps:step2. change cloud assets {b1, b3}
771      * @tc.expected: step2. return ok.
772      */
773     Asset asset = {};
774     Asset b1 = ASSET_COPY;
775     b1.name = ASSET_COPY.name + std::string("0");
776     Asset b2 = ASSET_COPY;
777     b2.name = ASSET_COPY.name + std::string("0") + ASSET_SUFFIX;
778     Asset b3 = ASSET_COPY;
779     b3.name = ASSET_COPY.name + std::string("0") + ASSET_SUFFIX + ASSET_SUFFIX;
780     Assets assets = { b1, b3 };
781     UpdateCloudAssets(asset, assets, std::string("0")); // 1 is version
782     /**
783      * @tc.steps:step3. sync
784      * @tc.expected: step3. download status is a -> DELETE, b2 -> DELETE, b3 -> INSERT
785      */
786     g_virtualAssetLoader->ForkDownload([&b1, &b3](const std::string &tableName,
__anoncaa136470a02(const std::string &tableName, std::map<std::string, Assets> &assets) 787         std::map<std::string, Assets> &assets) {
788         auto it = assets.find(COL_ASSETS);
789         ASSERT_EQ(it != assets.end(), true);
790         ASSERT_EQ(it->second.size(), 1u); // 1 is download size
791         for (const auto &b: it->second) {
792             if (b.name == b3.name) {
793                 EXPECT_EQ(b.status, static_cast<uint32_t>(AssetStatus::INSERT));
794             }
795         }
796     });
__anoncaa136470b02(std::map<std::string, Assets> &assets) 797     g_virtualAssetLoader->SetRemoveLocalAssetsCallback([&b2](std::map<std::string, Assets> &assets) {
798         auto it = assets.find(COL_ASSET);
799         EXPECT_EQ(it != assets.end(), true);
800         EXPECT_EQ(it->second.size(), 1u);
801         EXPECT_EQ(it->second[0].status, static_cast<uint32_t>(AssetStatus::DELETE));
802         it = assets.find(COL_ASSETS);
803         EXPECT_EQ(it != assets.end(), true);
804         EXPECT_EQ(it->second.size(), 1u); // 1 is remove size
805         for (const auto &b: it->second) {
806             if (b.name == b2.name) {
807                 EXPECT_EQ(b.status, static_cast<uint32_t>(AssetStatus::DELETE));
808             }
809         }
810         return DBStatus::OK;
811     });
812     CallSync(option);
813     g_virtualAssetLoader->ForkDownload(nullptr);
814     g_virtualAssetLoader->SetRemoveLocalAssetsCallback(nullptr);
815 }
816 
817 /**
818  * @tc.name: DownloadAssetStatusTest003
819  * @tc.desc: Test download assets status for UPDATE
820  * @tc.type: FUNC
821  * @tc.require:
822  * @tc.author: bty
823  */
824 HWTEST_F(DistributedDBCloudSyncerLockTest, DownloadAssetStatusTest003, TestSize.Level0)
825 {
826     /**
827      * @tc.steps:step1. init cloud assert {a, b1, b2} and sync to local
828      * @tc.expected: step1. return ok.
829      */
830     int count = 1;
831     InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
832     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
833     CallSync(option);
834     /**
835      * @tc.steps:step2. change cloud assets {a, b2}
836      * @tc.expected: step2. return ok.
837      */
838     Asset asset = ASSET_COPY;
839     asset.name = asset.name + "0";
840     asset.hash = "new_hash";
841     Asset b1 = ASSET_COPY;
842     b1.name = ASSET_COPY.name + std::string("0");
843     Asset b2 = ASSET_COPY;
844     b2.name = ASSET_COPY.name + std::string("0") + ASSET_SUFFIX;
845     b2.hash = "new_hash";
846     Assets assets = { b1, b2 };
847     UpdateCloudAssets(asset, assets, std::string("0")); // 1 is version
848     /**
849      * @tc.steps:step3. sync
850      * @tc.expected: step3. download status is a -> UPDATE, b2 -> UPDATE
851      */
852     g_virtualAssetLoader->ForkDownload([&b1, &b2](const std::string &tableName,
__anoncaa136470c02(const std::string &tableName, std::map<std::string, Assets> &assets) 853         std::map<std::string, Assets> &assets) {
854         auto it = assets.find(COL_ASSET);
855         ASSERT_EQ(it != assets.end(), true);
856         ASSERT_EQ(it->second.size(), 1u);
857         EXPECT_EQ(it->second[0].status, static_cast<uint32_t>(AssetStatus::UPDATE));
858 
859         it = assets.find(COL_ASSETS);
860         ASSERT_EQ(it != assets.end(), true);
861         ASSERT_EQ(it->second.size(), 1u); // 1 is download size
862         for (const auto &b: it->second) {
863             if (b.name == b2.name) {
864                 EXPECT_EQ(b.status, static_cast<uint32_t>(AssetStatus::UPDATE));
865             }
866         }
867     });
868     CallSync(option);
869     g_virtualAssetLoader->ForkDownload(nullptr);
870     g_virtualAssetLoader->SetRemoveLocalAssetsCallback(nullptr);
871 }
872 
873 /**
874  * @tc.name: RecordConflictTest001
875  * @tc.desc: Test the asset input param after download return CLOUD_RECORD_EXIST_CONFLICT
876  * @tc.type: FUNC
877  * @tc.require:
878  * @tc.author: bty
879  */
880 HWTEST_F(DistributedDBCloudSyncerLockTest, RecordConflictTest001, TestSize.Level0)
881 {
882     /**
883      * @tc.steps:step1. init data and sync
884      * @tc.expected: step1. return ok.
885      */
886     int count = 10;
887     InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
888     g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
889     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
890     int callCount = 0;
__anoncaa136470d02() 891     g_cloudStoreHook->SetSyncFinishHook([&callCount]() {
892         callCount++;
893         g_processCondition.notify_all();
894     });
895     CallSync(option);
896     {
897         std::unique_lock<std::mutex> lock(g_processMutex);
898         bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anoncaa136470e02() 899             [&callCount]() { return callCount == 2; }); // 2 is compensated sync
900         ASSERT_EQ(result, true);
901     }
902 
903     /**
904      * @tc.steps:step2. sync again and check asset
905      * @tc.expected: step2. return ok.
906      */
907     g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
__anoncaa136470f02(const std::string &tableName, std::map<std::string, Assets> &assets) 908     g_virtualAssetLoader->ForkDownload([](const std::string &tableName, std::map<std::string, Assets> &assets) {
909         EXPECT_EQ(assets.find(COL_ASSET) != assets.end(), true);
910     });
911     CallSync(option);
912     {
913         std::unique_lock<std::mutex> lock(g_processMutex);
914         bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anoncaa136471002() 915             [&callCount]() { return callCount == 4; }); // 4 is compensated sync
916         ASSERT_EQ(result, true);
917     }
918     g_cloudStoreHook->SetSyncFinishHook(nullptr);
919     g_virtualAssetLoader->ForkDownload(nullptr);
920 }
921 
922 /**
923  * @tc.name: QueryCursorTest003
924  * @tc.desc: Test whether cursor fallback
925  * @tc.type: FUNC
926  * @tc.require:
927  * @tc.author: bty
928  */
929 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest003, TestSize.Level0)
930 {
931     /**
932      * @tc.steps:step1. init cloud data and sync
933      * @tc.expected: step1. return ok.
934      */
935     int cloudCount = 10;
936     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
937     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
938     CallSync(option);
939 
940     /**
941      * @tc.steps:step2. delete cloud data and sync
942      * @tc.expected: step2. return ok.
943      */
944     DeleteCloudDBData(0, cloudCount, ASSETS_TABLE_NAME);
945     CallSync(option);
946 
947     /**
948      * @tc.steps:step3. remove data
949      * @tc.expected: step3. return ok.
950      */
951     std::string device = "";
952     ASSERT_EQ(g_delegate->RemoveDeviceData(device, DistributedDB::FLAG_ONLY), DBStatus::OK);
953 
954     /**
955      * @tc.steps:step4. insert local and check cursor
956      * @tc.expected: step4. return ok.
957      */
958     InsertLocalData(0, 1, ASSETS_TABLE_NAME, true);
959     std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
960         " where cursor='31';";
961     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
962         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
963 }
964 
965 /**
966  * @tc.name: QueryCursorTest004
967  * @tc.desc: Test temp trigger under concurrency
968  * @tc.type: FUNC
969  * @tc.require:
970  * @tc.author: bty
971  */
972 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest004, TestSize.Level0)
973 {
974     /**
975      * @tc.steps:step1. init cloud data
976      * @tc.expected: step1. return ok.
977      */
978     int cloudCount = 10;
979     InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
980     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
981 
982     /**
983      * @tc.steps:step2. set tracker table before saving cloud data
984      * @tc.expected: step2. return ok.
985      */
__anoncaa136471102(const std::string &table, VBucket &) 986     g_virtualCloudDb->ForkQuery([](const std::string &table, VBucket &) {
987         TrackerSchema schema = {
988             .tableName = ASSETS_TABLE_NAME, .extendColNames = {COL_NAME}, .trackerColNames = { COL_ID }
989         };
990         EXPECT_EQ(g_delegate->SetTrackerTable(schema), WITH_INVENTORY_DATA);
991     });
992     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
993     CallSync(option);
994 
995     /**
996      * @tc.steps:step3. check extend_field and cursor
997      * @tc.expected: step3. return ok.
998      */
999     std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
1000         " where data_key='0' and json_extract(extend_field, '$.name')='name10' and cursor='32';";
1001     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1002         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
1003 }
1004 
1005 /**
1006  * @tc.name: QueryCursorTest006
1007  * @tc.desc: Test cursor increasing when remove assets fail and download assets success
1008  * @tc.type: FUNC
1009  * @tc.require:
1010  * @tc.author: suyue
1011  */
1012 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest006, TestSize.Level0)
1013 {
1014     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1015     /**
1016      * @tc.steps:step1. insert local and sync
1017      * @tc.expected: step1. return ok.
1018      */
1019     InsertLocalData(0, 1, ASSETS_TABLE_NAME, false);
1020     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
1021     CallSync(option);
1022 
1023     /**
1024      * @tc.steps:step2. change asset/assets and set BatchRemoveLocalAssets fail
1025      * @tc.expected: step2. return ok.
1026      */
1027     std::string sql = "SELECT asset, assets FROM " + ASSETS_TABLE_NAME + ";";
1028     sqlite3_stmt *stmt = nullptr;
1029     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1030     Asset asset;
1031     Assets assets;
1032     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1033         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1034         ASSERT_EQ(sqlite3_column_type(stmt, 1), SQLITE_BLOB);
1035         Type assetBlob;
1036         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, assetBlob), E_OK);
1037         asset = g_virtualCloudDataTranslate->BlobToAsset(std::get<Bytes>(assetBlob));
1038         Type assetsBlob;
1039         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, assetsBlob), E_OK);
1040         assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(assetsBlob));
1041     }
1042     int errCode = E_OK;
1043     SQLiteUtils::ResetStatement(stmt, true, errCode);
1044     asset.hash = "new_hash";
1045     assets.pop_back();
1046     UpdateCloudAssets(asset, assets, std::string("0"));
1047     g_virtualAssetLoader->SetBatchRemoveStatus(DBStatus::LOCAL_ASSET_NOT_FOUND);
1048 
1049     /**
1050      * @tc.steps:step3. sync and check cursor
1051      * @tc.expected: step3. return ok.
1052      */
1053     CallSync(option);
1054     sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
1055         " where cursor='3';";
1056     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1057         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
1058     g_virtualAssetLoader->SetBatchRemoveStatus(DBStatus::OK);
1059     EXPECT_EQ(g_syncProcess.errCode, DBStatus::REMOVE_ASSETS_FAIL);
1060     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1061 }
1062 
1063 /**
1064  * @tc.name: UploadAbnormalSync001
1065  * @tc.desc: Test upload update record, cloud returned record not found.
1066  * @tc.type: FUNC
1067  * @tc.require:
1068  * @tc.author: bty
1069  */
1070 HWTEST_F(DistributedDBCloudSyncerLockTest, UploadAbnormalSync001, TestSize.Level0)
1071 {
1072     /**
1073      * @tc.steps:step1. insert local data and sync
1074      * @tc.expected: step1. return ok.
1075      */
1076     int cloudCount = 1;
1077     InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
1078     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::DOWNLOAD);
1079     CallSync(option);
1080 
1081     /**
1082      * @tc.steps:step2. update local data and sync, cloud returned record not found.
1083      * @tc.expected: step2. return ok.
1084      */
1085     std::string sql = "update " + ASSETS_TABLE_NAME + " set name = 'xxx' where id = 0;";
1086     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
1087     int upIdx = 0;
__anoncaa136471202(const std::string &tableName, VBucket &extend) 1088     g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
1089         LOGD("cloud db upload index:%d", ++upIdx);
1090         if (upIdx == 1) { // 1 is index
1091             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
1092         }
1093     });
1094     int doUpIdx = 0;
__anoncaa136471302null1095     g_cloudStoreHook->SetDoUploadHook([&doUpIdx] {
1096         LOGD("begin upload index:%d", ++doUpIdx);
1097     });
1098     int callCount = 0;
__anoncaa136471402() 1099     g_cloudStoreHook->SetSyncFinishHook([&callCount, this]() {
1100         LOGD("sync finish times:%d", ++callCount);
1101         if (callCount == 1) { // 1 is the normal sync
1102             CheckUploadAbnormal(OpType::UPDATE, 1L); // 1 is expected count
1103         } else {
1104             CheckUploadAbnormal(OpType::UPDATE, 1L, true); // 1 is expected count
1105         }
1106         g_processCondition.notify_all();
1107     });
1108     CallSync(option);
1109     {
1110         std::unique_lock<std::mutex> lock(g_processMutex);
1111         bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anoncaa136471502() 1112             [&callCount]() { return callCount == 2; }); // 2 is sync times
1113         ASSERT_EQ(result, true);
1114     }
1115 }
1116 
1117 /**
1118  * @tc.name: UploadAbnormalSync002
1119  * @tc.desc: Test upload insert record, cloud returned record already existed.
1120  * @tc.type: FUNC
1121  * @tc.require:
1122  * @tc.author: bty
1123  */
1124 HWTEST_F(DistributedDBCloudSyncerLockTest, UploadAbnormalSync002, TestSize.Level0)
1125 {
1126     /**
1127      * @tc.steps:step1. insert a and sync
1128      * @tc.expected: step1. return ok.
1129      */
1130     int cloudCount = 1;
1131     InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
1132     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::DOWNLOAD);
1133     CallSync(option);
1134 
1135     /**
1136      * @tc.steps:step2. insert b and sync, cloud returned record not found.
1137      * @tc.expected: step2. return ok.
1138      */
1139     InsertLocalData(cloudCount, cloudCount, ASSETS_TABLE_NAME, true);
1140     int upIdx = 0;
__anoncaa136471602(const std::string &tableName, VBucket &extend) 1141     g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
1142         LOGD("cloud db upload index:%d", ++upIdx);
1143         if (upIdx == 2) { // 2 is index
1144             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_ALREADY_EXISTED);
1145         }
1146     });
1147     int doUpIdx = 0;
__anoncaa136471702null1148     g_cloudStoreHook->SetDoUploadHook([&doUpIdx, cloudCount, this] {
1149         LOGD("begin upload index:%d", ++doUpIdx);
1150         if (doUpIdx == 1) { // 1 is index
1151             InsertCloudDBData(cloudCount, cloudCount, cloudCount, ASSETS_TABLE_NAME);
1152         }
1153     });
1154     int callCount = 0;
__anoncaa136471802() 1155     g_cloudStoreHook->SetSyncFinishHook([&callCount, this]() {
1156         LOGD("sync finish times:%d", ++callCount);
1157         if (callCount == 1) { // 1 is the normal sync
1158             CheckUploadAbnormal(OpType::INSERT, 1L); // 1 is expected count
1159         } else {
1160             CheckUploadAbnormal(OpType::INSERT, 2L, true); // 1 is expected count
1161         }
1162         g_processCondition.notify_all();
1163     });
1164     CallSync(option);
1165     {
1166         std::unique_lock<std::mutex> lock(g_processMutex);
1167         bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anoncaa136471902() 1168             [&callCount]() { return callCount == 2; }); // 2 is sync times
1169         ASSERT_EQ(result, true);
1170     }
1171 }
1172 
1173 /**
1174  * @tc.name: UploadAbnormalSync003
1175  * @tc.desc: Test upload delete record, cloud returned record not found.
1176  * @tc.type: FUNC
1177  * @tc.require:
1178  * @tc.author: bty
1179  */
1180 HWTEST_F(DistributedDBCloudSyncerLockTest, UploadAbnormalSync003, TestSize.Level0)
1181 {
1182     /**
1183      * @tc.steps:step1. insert local data and sync
1184      * @tc.expected: step1. return ok.
1185      */
1186     int cloudCount = 1;
1187     InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
1188     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::DOWNLOAD);
1189     CallSync(option);
1190 
1191     /**
1192      * @tc.steps:step2. delete local data and sync, cloud returned record not found.
1193      * @tc.expected: step2. return ok.
1194      */
1195     std::string sql = "delete from " + ASSETS_TABLE_NAME + " where id = 0;";
1196     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
1197     int upIdx = 0;
__anoncaa136471a02(const std::string &tableName, VBucket &extend) 1198     g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
1199         LOGD("cloud db upload index:%d", ++upIdx);
1200         if (upIdx == 2) { // 2 is index
1201             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
1202         }
1203     });
1204     int doUpIdx = 0;
__anoncaa136471b02null1205     g_cloudStoreHook->SetDoUploadHook([&doUpIdx, cloudCount, this] {
1206         LOGD("begin upload index:%d", ++doUpIdx);
1207         if (doUpIdx == 1) { // 1 is index
1208             DeleteCloudDBData(0, cloudCount, ASSETS_TABLE_NAME);
1209         }
1210     });
1211     int callCount = 0;
__anoncaa136471c02() 1212     g_cloudStoreHook->SetSyncFinishHook([&callCount, this]() {
1213         LOGD("sync finish times:%d", ++callCount);
1214         if (callCount == 1) { // 1 is the normal sync
1215             CheckUploadAbnormal(OpType::DELETE, 1L); // 1 is expected count
1216         } else {
1217             CheckUploadAbnormal(OpType::DELETE, 1L, true); // 1 is expected count
1218         }
1219         g_processCondition.notify_all();
1220     });
1221     CallSync(option);
1222     {
1223         std::unique_lock<std::mutex> lock(g_processMutex);
1224         bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anoncaa136471d02() 1225             [&callCount]() { return callCount == 1; }); // 1 is sync times
1226         ASSERT_EQ(result, true);
1227     }
1228 }
1229 
1230 /**
1231  * @tc.name: ReviseLocalModTimeTest001
1232  * @tc.desc: test sync data with invalid timestamp.
1233  * @tc.type: FUNC
1234  * @tc.require:
1235  * @tc.author: liaoyonghuang
1236  */
1237 HWTEST_F(DistributedDBCloudSyncerLockTest, ReviseLocalModTimeTest001, TestSize.Level0)
1238 {
1239     /**
1240      * @tc.steps:step1. insert local data
1241      * @tc.expected: step1. return ok.
1242      */
1243     int cloudCount = 31; // 31 records
1244     InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
1245     /**
1246      * @tc.steps:step2. Modify time and sync
1247      * @tc.expected: step2. return ok.
1248      */
1249     uint64_t curTime = 0;
1250     EXPECT_EQ(TimeHelper::GetSysCurrentRawTime(curTime), E_OK);
1251     uint64_t invalidTime = curTime + curTime;
1252     std::string sql = "UPDATE " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
1253         " SET timestamp=" + std::to_string(invalidTime) + " where rowid>0";
1254     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
1255     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
1256     CallSync(option);
1257     /**
1258      * @tc.steps:step3. Check modify time in log table
1259      * @tc.expected: step3. return ok.
1260      */
1261     EXPECT_EQ(TimeHelper::GetSysCurrentRawTime(curTime), E_OK);
1262     sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
1263         " where timestamp < " + std::to_string(curTime);
1264     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1265         reinterpret_cast<void *>(cloudCount), nullptr), SQLITE_OK);
1266 }
1267 
1268 /**
1269  * @tc.name: RemoveAssetsFailTest001
1270  * @tc.desc: Test failCount when remove assets fail
1271  * @tc.type: FUNC
1272  * @tc.require:
1273  * @tc.author: suyue
1274  */
1275 HWTEST_F(DistributedDBCloudSyncerLockTest, RemoveAssetsFailTest001, TestSize.Level0)
1276 {
1277     /**
1278      * @tc.steps:step1. insert local and sync
1279      * @tc.expected: step1. return ok.
1280      */
1281     RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1282     InsertLocalData(0, 1, ASSETS_TABLE_NAME, false);
1283     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
1284     CallSync(option);
1285 
1286     /**
1287      * @tc.steps:step2. change asset and set RemoveLocalAssets fail
1288      * @tc.expected: step2. return ok.
1289      */
1290     std::string sql = "SELECT asset, assets FROM " + ASSETS_TABLE_NAME + ";";
1291     sqlite3_stmt *stmt = nullptr;
1292     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1293     Asset asset;
1294     Assets assets;
1295     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1296         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1297         ASSERT_EQ(sqlite3_column_type(stmt, 1), SQLITE_BLOB);
1298         Type assetsBlob;
1299         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, assetsBlob), E_OK);
1300         assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(assetsBlob));
1301         Type assetBlob;
1302         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, assetBlob), E_OK);
1303         asset = g_virtualCloudDataTranslate->BlobToAsset(std::get<Bytes>(assetBlob));
1304     }
1305     int errCode = E_OK;
1306     SQLiteUtils::ResetStatement(stmt, true, errCode);
1307     asset.hash = "new_hash";
1308     assets.pop_back();
1309     UpdateCloudAssets(asset, assets, std::string("0"));
1310     g_virtualAssetLoader->SetRemoveStatus(DBStatus::LOCAL_ASSET_NOT_FOUND);
1311 
1312     /**
1313      * @tc.steps:step3. sync and check failCount
1314      * @tc.expected: step3. return ok.
1315      */
1316     int downLoadCount = 0;
1317     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
__anoncaa136471e02(const std::string &tableName, std::map<std::string, Assets> &assets) 1318         std::map<std::string, Assets> &assets) {
1319         downLoadCount++;
1320         if (downLoadCount == 1) {
1321             std::string sql = "delete from " + ASSETS_TABLE_NAME + " WHERE id=0";
1322             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
1323         }
1324     });
1325     CallSync(option);
1326     for (const auto &table : g_syncProcess.tableProcess) {
1327         EXPECT_EQ(table.second.downLoadInfo.failCount, 1u);
1328     }
1329     g_virtualAssetLoader->SetRemoveStatus(DBStatus::OK);
1330     RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1331     g_virtualAssetLoader->ForkDownload(nullptr);
1332 }
1333 
1334 /**
1335  * @tc.name: CompensateSyncTest001
1336  * @tc.desc: Test only compensates for the sync of deleted data
1337  * @tc.type: FUNC
1338  * @tc.require:
1339  * @tc.author: bty
1340  */
1341 HWTEST_F(DistributedDBCloudSyncerLockTest, CompensateSyncTest001, TestSize.Level1)
1342 {
1343     /**
1344      * @tc.steps:step1. insert cloud and sync
1345      * @tc.expected: step1. return ok.
1346      */
1347     int cloudCount = 30;
1348     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1349     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
1350     CallSync(option);
1351 
1352     /**
1353      * @tc.steps:step2. lock and delete 1-10
1354      * @tc.expected: step2. return ok.
1355      */
1356     std::vector<std::vector<uint8_t>> hashKeys;
1357     CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key < 10 ", db, hashKeys);
1358     EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKeys, db), OK);
1359     std::string sql = "delete from " + ASSETS_TABLE_NAME + " where id < 10;";
1360     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
1361 
1362     /**
1363      * @tc.steps:step3. compensate sync and check query type
1364      * @tc.expected: step3. return ok.
1365      */
__anoncaa136471f02(const std::string &, VBucket &extend) 1366     g_virtualCloudDb->ForkQuery([](const std::string &, VBucket &extend) {
1367         int64_t type;
1368         CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::TYPE_FIELD, extend, type);
1369         EXPECT_EQ(type, 1u);
1370     });
1371     CloudSyncOption cOption = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT,
1372         true, true);
1373     CallSync(cOption);
1374     sleep(1);
1375 
1376     /**
1377      * @tc.steps:step4. lock and delete id 30
1378      * @tc.expected: step4. return ok.
1379      */
1380     InsertLocalData(cloudCount, 1, ASSETS_TABLE_NAME, true);
1381     hashKeys.clear();
1382     CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 30 ", db, hashKeys);
1383     EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKeys, db), OK);
1384     sql = "delete from " + ASSETS_TABLE_NAME + " where id = 30;";
1385     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
1386 
1387     /**
1388      * @tc.steps:step5. compensate sync and check query type
1389      * @tc.expected: step5. return ok.
1390      */
__anoncaa136472002(const std::string &, VBucket &extend) 1391     g_virtualCloudDb->ForkQuery([](const std::string &, VBucket &extend) {
1392         int64_t type;
1393         CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::TYPE_FIELD, extend, type);
1394         EXPECT_EQ(type, 0u);
1395     });
1396     CallSync(cOption);
1397     sleep(1);
1398     g_virtualCloudDb->ForkQuery(nullptr);
1399 }
1400 
1401 /**
1402  * @tc.name: UnLockSyncTest001
1403  * @tc.desc: Test sync after unlock data
1404  * @tc.type: FUNC
1405  * @tc.require:
1406  * @tc.author: suyue
1407  */
1408 HWTEST_F(DistributedDBCloudSyncerLockTest, UnLockSyncTest001, TestSize.Level1)
1409 {
1410     /**
1411      * @tc.steps: step1. insert data and sync
1412      * @tc.expected: step1. return ok.
1413      */
1414     int localCount = 100;
1415     InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
1416     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
1417     CallSync(option);
1418 
1419     /**
1420      * @tc.steps: step2. lock 0-70, update all data and unlock
1421      * @tc.expected: step2. unlock return WAIT_COMPENSATED_SYNC.
1422      */
1423     std::vector<std::vector<uint8_t>> hashKeys;
1424     CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key < 70 ", db, hashKeys);
1425     EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKeys, db), OK);
1426     std::string sql = "update " + ASSETS_TABLE_NAME + " set name = 'xxx';";
1427     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
1428 
1429     /**
1430      * @tc.steps: step3. non-compensated sync for condition query and isPriorityTask is true.
1431      * @tc.expected: step3. sync data that is not in the UNLOCKING state.
1432      */
1433     EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKeys, db), WAIT_COMPENSATED_SYNC);
1434     std::vector<int> values;
1435     for (int i = 50; i < 100; i++) {
1436         values.push_back(i);
1437     }
1438     option = PrepareOption(Query::Select().From(ASSETS_TABLE_NAME).In("id", values), LockAction::INSERT, true, false);
1439     CallSync(option);
1440     for (const auto &table : g_syncProcess.tableProcess) {
1441         EXPECT_EQ(table.second.upLoadInfo.total, 30u);
1442     }
1443 
1444     /**
1445      * @tc.steps: step4. compensate sync and check upLoadInfo
1446      * @tc.expected: step4. synch all data to be compensated in the UNLOCKING state.
1447      */
1448     option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT, true, true);
1449     CallSync(option);
1450     for (const auto &table : g_syncProcess.tableProcess) {
1451         EXPECT_EQ(table.second.upLoadInfo.total, 70u);
1452     }
1453 }
1454 
1455 /**
1456  * @tc.name: TaskIdTest001
1457  * @tc.desc: Test sync with specific task id
1458  * @tc.type: FUNC
1459  * @tc.require:
1460  * @tc.author: liaoyonghuang
1461  */
1462 HWTEST_F(DistributedDBCloudSyncerLockTest, TaskIdTest001, TestSize.Level0)
1463 {
1464     /**
1465      * @tc.steps:step1. insert cloud and sync
1466      * @tc.expected: step1. return ok.
1467      */
1468     int cloudCount = 10;
1469     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1470     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
1471     /**
1472      * @tc.steps:step2. sync with specific task id(1) when query
1473      * @tc.expected: step2. return ok.
1474      */
1475     int queryTime = 0;
__anoncaa136472102(const std::string &, VBucket &extend) 1476     g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &extend) {
1477         if (queryTime == 0) {
1478             queryTime++;
1479             EXPECT_EQ(g_delegate->Sync(option, nullptr, 1u), OK);
1480         }
1481     });
1482     CallSync(option);
1483     /**
1484      * @tc.steps:step3. sync without task id
1485      * @tc.expected: step3. return ok.
1486      */
1487     std::this_thread::sleep_for(std::chrono::milliseconds(200));
__anoncaa136472202(const std::string &, VBucket &extend) 1488     g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &extend) {
1489         SyncProcess syncProcess = g_delegate->GetCloudTaskStatus(UINT64_MAX - 1);
1490         EXPECT_EQ(syncProcess.errCode, OK);
1491     });
1492     CallSync(option);
1493     g_virtualCloudDb->ForkQuery(nullptr);
1494 }
1495 } // namespace
1496 #endif // RELATIONAL_STORE