1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "cloud/asset_operation_utils.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "relational_store_client.h"
24 #include "relational_store_instance.h"
25 #include "relational_store_manager.h"
26 #include "runtime_config.h"
27 #include "sqlite_relational_store.h"
28 #include "sqlite_relational_utils.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "virtual_communicator_aggregator.h"
34 #include <gtest/gtest.h>
35 #include <iostream>
36
37 using namespace testing::ext;
38 using namespace DistributedDB;
39 using namespace DistributedDBUnitTest;
40 using namespace std;
41
42 namespace {
43 const string STORE_ID = "Relational_Store_Lock_Sync";
44 const string DB_SUFFIX = ".db";
45 const string ASSETS_TABLE_NAME = "student";
46 const string ASSETS_TABLE_NAME_SHARED = "student_shared";
47 const string DEVICE_CLOUD = "cloud_dev";
48 const string COL_ID = "id";
49 const string COL_NAME = "name";
50 const string COL_HEIGHT = "height";
51 const string COL_ASSET = "asset";
52 const string COL_ASSETS = "assets";
53 const string COL_AGE = "age";
54 const int64_t WAIT_TIME = 5;
55 const std::vector<Field> CLOUD_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true}, {COL_NAME, TYPE_INDEX<std::string>},
56 {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>}};
57 const string CREATE_SINGLE_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + ASSETS_TABLE_NAME + "(" + COL_ID +
58 " INTEGER PRIMARY KEY," + COL_NAME + " TEXT ," + COL_ASSET + " ASSET," + COL_ASSETS + " ASSETS" + ");";
59 const Asset ASSET_COPY = {.version = 1,
60 .name = "Phone",
61 .assetId = "0",
62 .subpath = "/local/sync",
63 .uri = "/local/sync",
64 .modifyTime = "123456",
65 .createTime = "",
66 .size = "256",
67 .hash = "ASE"};
68 const Assets ASSETS_COPY1 = { ASSET_COPY };
69 const string ASSET_SUFFIX = "_copy";
70
71 string g_storePath;
72 string g_testDir;
73 RelationalStoreObserverUnitTest *g_observer = nullptr;
74 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
75 RelationalStoreDelegate *g_delegate = nullptr;
76 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
77 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
78 std::shared_ptr<VirtualCloudDataTranslate> g_virtualCloudDataTranslate;
79 SyncProcess g_syncProcess;
80 std::condition_variable g_processCondition;
81 std::mutex g_processMutex;
82 IRelationalStore *g_store = nullptr;
83 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
84 int64_t g_nameId;
85 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
86
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)87 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
88 {
89 TableSchema assetsTableSchema = {.name = ASSETS_TABLE_NAME, .sharedTableName = ASSETS_TABLE_NAME_SHARED,
90 .fields = CLOUD_FIELDS};
91 dataBaseSchema.tables.push_back(assetsTableSchema);
92 }
93
CloseDb()94 void CloseDb()
95 {
96 if (g_delegate != nullptr) {
97 EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
98 g_delegate = nullptr;
99 }
100 if (g_observer != nullptr) {
101 delete g_observer;
102 g_observer = nullptr;
103 }
104 g_virtualCloudDb = nullptr;
105 }
106
107 class DistributedDBCloudSyncerLockTest : public testing::Test {
108 public:
109 static void SetUpTestCase(void);
110 static void TearDownTestCase(void);
111 void SetUp();
112 void TearDown();
113
114 protected:
115 void Init();
116 const RelationalSyncAbleStorage *GetRelationalStore();
117 void InsertLocalData(int64_t begin, int64_t count, const std::string &tableName, bool isAssetNull = true);
118 void GenerateDataRecords(
119 int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend);
120 void InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart, const std::string &tableName);
121 void UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart, int64_t versionStart,
122 const std::string &tableName);
123 void DeleteCloudDBData(int64_t beginGid, int64_t count, const std::string &tableName);
124 void CallSync(const CloudSyncOption &option, DBStatus expectResult = OK);
125
126 void TestConflictSync001(bool isUpdate);
127 void CheckAssetStatusNormal();
128 void UpdateCloudAssets(Asset &asset, Assets &assets, const std::string &version);
129 void CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated = false);
130 sqlite3 *db = nullptr;
131 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
132 };
133
SetUpTestCase(void)134 void DistributedDBCloudSyncerLockTest::SetUpTestCase(void)
135 {
136 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
137 g_storePath = g_testDir + "/" + STORE_ID + DB_SUFFIX;
138 LOGI("The test db is:%s", g_storePath.c_str());
139 g_virtualCloudDataTranslate = std::make_shared<VirtualCloudDataTranslate>();
140 RuntimeConfig::SetCloudTranslate(g_virtualCloudDataTranslate);
141 }
142
TearDownTestCase(void)143 void DistributedDBCloudSyncerLockTest::TearDownTestCase(void) {}
144
SetUp(void)145 void DistributedDBCloudSyncerLockTest::SetUp(void)
146 {
147 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
148 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
149 LOGE("rm test db files error.");
150 }
151 DistributedDBToolsUnitTest::PrintTestCaseInfo();
152 LOGD("Test dir is %s", g_testDir.c_str());
153 Init();
154 g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
155 ASSERT_NE(g_cloudStoreHook, nullptr);
156 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
157 ASSERT_TRUE(communicatorAggregator_ != nullptr);
158 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
159 }
160
TearDown(void)161 void DistributedDBCloudSyncerLockTest::TearDown(void)
162 {
163 RefObject::DecObjRef(g_store);
164 g_virtualCloudDb->ForkUpload(nullptr);
165 CloseDb();
166 EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
167 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
168 LOGE("rm test db files error.");
169 }
170 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
171 communicatorAggregator_ = nullptr;
172 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
173 }
174
Init()175 void DistributedDBCloudSyncerLockTest::Init()
176 {
177 db = RelationalTestUtils::CreateDataBase(g_storePath);
178 ASSERT_NE(db, nullptr);
179 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
180 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_SINGLE_PRIMARY_KEY_TABLE), SQLITE_OK);
181 g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
182 ASSERT_NE(g_observer, nullptr);
183 ASSERT_EQ(g_mgr.OpenStore(g_storePath, STORE_ID, RelationalStoreDelegate::Option{.observer = g_observer},
184 g_delegate), DBStatus::OK);
185 ASSERT_NE(g_delegate, nullptr);
186 ASSERT_EQ(g_delegate->CreateDistributedTable(ASSETS_TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
187 g_virtualCloudDb = make_shared<VirtualCloudDb>();
188 g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
189 g_syncProcess = {};
190 ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
191 ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
192 DataBaseSchema dataBaseSchema;
193 GetCloudDbSchema(dataBaseSchema);
194 ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
195 g_nameId = 0;
196 }
197
GetRelationalStore()198 const RelationalSyncAbleStorage* DistributedDBCloudSyncerLockTest::GetRelationalStore()
199 {
200 RelationalDBProperties properties;
201 CloudDBSyncUtilsTest::InitStoreProp(g_storePath, APP_ID, USER_ID, STORE_ID, properties);
202 int errCode = E_OK;
203 g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
204 if (g_store == nullptr) {
205 return nullptr;
206 }
207 return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
208 }
209
210
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,std::vector<VBucket> & record,std::vector<VBucket> & extend)211 void DistributedDBCloudSyncerLockTest::GenerateDataRecords(
212 int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend)
213 {
214 for (int64_t i = begin; i < begin + count; i++) {
215 Assets assets;
216 Asset asset = ASSET_COPY;
217 asset.name = ASSET_COPY.name + std::to_string(i);
218 assets.emplace_back(asset);
219 VBucket data;
220 data.insert_or_assign(COL_ASSET, asset);
221 asset.name = ASSET_COPY.name + std::to_string(i) + "_copy";
222 assets.emplace_back(asset);
223 data.insert_or_assign(COL_ID, i);
224 data.insert_or_assign(COL_NAME, "name" + std::to_string(g_nameId++));
225 data.insert_or_assign(COL_ASSETS, assets);
226 record.push_back(data);
227
228 VBucket log;
229 Timestamp now = TimeHelper::GetSysCurrentTime();
230 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
231 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
232 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
233 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
234 extend.push_back(log);
235 }
236 }
237
InsertLocalData(int64_t begin,int64_t count,const std::string & tableName,bool isAssetNull)238 void DistributedDBCloudSyncerLockTest::InsertLocalData(int64_t begin, int64_t count,
239 const std::string &tableName, bool isAssetNull)
240 {
241 int errCode;
242 std::vector<VBucket> record;
243 std::vector<VBucket> extend;
244 GenerateDataRecords(begin, count, 0, record, extend);
245 const string sql = "insert or replace into " + tableName + " values (?,?,?,?);";
246 for (VBucket vBucket : record) {
247 sqlite3_stmt *stmt = nullptr;
248 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
249 ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, std::get<int64_t>(vBucket[COL_ID])), E_OK); // 1 is id
250 ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, std::get<string>(vBucket[COL_NAME])), E_OK); // 2 is name
251 if (isAssetNull) {
252 ASSERT_EQ(sqlite3_bind_null(stmt, 3), SQLITE_OK); // 3 is asset
253 } else {
254 std::vector<uint8_t> assetBlob = g_virtualCloudDataTranslate->AssetToBlob(ASSET_COPY);
255 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 3, assetBlob, false), E_OK); // 3 is asset
256 }
257 std::vector<uint8_t> assetsBlob = g_virtualCloudDataTranslate->AssetsToBlob(
258 std::get<Assets>(vBucket[COL_ASSETS]));
259 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 4, assetsBlob, false), E_OK); // 4 is assets
260 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
261 SQLiteUtils::ResetStatement(stmt, true, errCode);
262 }
263 }
264
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const std::string & tableName)265 void DistributedDBCloudSyncerLockTest::InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
266 const std::string &tableName)
267 {
268 std::this_thread::sleep_for(std::chrono::milliseconds(1));
269 std::vector<VBucket> record;
270 std::vector<VBucket> extend;
271 GenerateDataRecords(begin, count, gidStart, record, extend);
272 ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(tableName, std::move(record), extend), DBStatus::OK);
273 std::this_thread::sleep_for(std::chrono::milliseconds(1));
274 }
275
UpdateCloudDBData(int64_t begin,int64_t count,int64_t gidStart,int64_t versionStart,const std::string & tableName)276 void DistributedDBCloudSyncerLockTest::UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
277 int64_t versionStart, const std::string &tableName)
278 {
279 std::this_thread::sleep_for(std::chrono::milliseconds(1));
280 std::vector<VBucket> record;
281 std::vector<VBucket> extend;
282 GenerateDataRecords(begin, count, gidStart, record, extend);
283 for (auto &entry: extend) {
284 entry[CloudDbConstant::VERSION_FIELD] = std::to_string(versionStart++);
285 }
286 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(tableName, std::move(record), extend), DBStatus::OK);
287 std::this_thread::sleep_for(std::chrono::milliseconds(1));
288 }
289
DeleteCloudDBData(int64_t beginGid,int64_t count,const std::string & tableName)290 void DistributedDBCloudSyncerLockTest::DeleteCloudDBData(int64_t beginGid, int64_t count,
291 const std::string &tableName)
292 {
293 Timestamp now = TimeHelper::GetSysCurrentTime();
294 std::vector<VBucket> extend;
295 for (int64_t i = 0; i < count; ++i) {
296 VBucket log;
297 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
298 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
299 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(beginGid + i));
300 extend.push_back(log);
301 }
302 ASSERT_EQ(g_virtualCloudDb->BatchDelete(tableName, extend), DBStatus::OK);
303 std::this_thread::sleep_for(std::chrono::milliseconds(count));
304 }
305
PrepareOption(const Query & query,LockAction action,bool isPriorityTask=false,bool isCompensatedSyncOnly=false)306 CloudSyncOption PrepareOption(const Query &query, LockAction action, bool isPriorityTask = false,
307 bool isCompensatedSyncOnly = false)
308 {
309 CloudSyncOption option;
310 option.devices = { "CLOUD" };
311 option.mode = SYNC_MODE_CLOUD_MERGE;
312 option.query = query;
313 option.waitTime = WAIT_TIME;
314 option.priorityTask = isPriorityTask;
315 option.compensatedSyncOnly = isCompensatedSyncOnly;
316 option.lockAction = action;
317 return option;
318 }
319
CallSync(const CloudSyncOption & option,DBStatus expectResult)320 void DistributedDBCloudSyncerLockTest::CallSync(const CloudSyncOption &option, DBStatus expectResult)
321 {
322 std::mutex dataMutex;
323 std::condition_variable cv;
324 bool finish = false;
325 SyncProcess last;
326 auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
327 for (const auto &item: process) {
328 if (item.second.process == DistributedDB::FINISHED) {
329 {
330 std::lock_guard<std::mutex> autoLock(dataMutex);
331 finish = true;
332 last = item.second;
333 }
334 cv.notify_one();
335 }
336 }
337 };
338 ASSERT_EQ(g_delegate->Sync(option, callback), expectResult);
339 if (expectResult == OK) {
340 std::unique_lock<std::mutex> uniqueLock(dataMutex);
341 cv.wait(uniqueLock, [&finish]() {
342 return finish;
343 });
344 }
345 g_syncProcess = last;
346 }
347
TestConflictSync001(bool isUpdate)348 void DistributedDBCloudSyncerLockTest::TestConflictSync001(bool isUpdate)
349 {
350 /**
351 * @tc.steps:step1. init data and sync
352 * @tc.expected: step1. return ok.
353 */
354 int cloudCount = 20;
355 int localCount = 10;
356 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
357 InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
358 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
359 CallSync(option);
360
361 /**
362 * @tc.steps:step2. update local data to upload, and set hook before upload, operator cloud data which id is 1
363 * @tc.expected: step2. return ok.
364 */
365 std::string sql;
366 if (isUpdate) {
367 sql = "update " + ASSETS_TABLE_NAME + " set name = 'xxx' where id = 1;";
368 } else {
369 sql = "delete from " + ASSETS_TABLE_NAME + " where id = 1;";
370 }
371 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
372 int index = 0;
373 g_cloudStoreHook->SetDoUploadHook([&index, this]() {
374 if (++index == 1) {
375 UpdateCloudDBData(1, 1, 0, 21, ASSETS_TABLE_NAME); // 21 is version
376 }
377 });
378
379 /**
380 * @tc.steps:step3. sync and check local data
381 * @tc.expected: step3. return ok.
382 */
383 CallSync(option);
384 sql = "select count(*) from " + ASSETS_TABLE_NAME + " where name = 'name30' AND id = '1';";
385 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
386 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
387 }
388
CheckAssetStatusNormal()389 void DistributedDBCloudSyncerLockTest::CheckAssetStatusNormal()
390 {
391 std::string sql = "SELECT asset, assets FROM " + ASSETS_TABLE_NAME + ";";
392 sqlite3_stmt *stmt = nullptr;
393 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
394 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
395 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
396 ASSERT_EQ(sqlite3_column_type(stmt, 1), SQLITE_BLOB);
397 Type assetBlob;
398 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, assetBlob), E_OK);
399 Asset asset = g_virtualCloudDataTranslate->BlobToAsset(std::get<Bytes>(assetBlob));
400 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
401 Type assetsBlob;
402 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, assetsBlob), E_OK);
403 Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(assetsBlob));
404 for (const auto &as : assets) {
405 EXPECT_EQ(as.status, static_cast<uint32_t>(AssetStatus::NORMAL));
406 }
407 }
408 int errCode = E_OK;
409 SQLiteUtils::ResetStatement(stmt, true, errCode);
410 }
411
UpdateCloudAssets(Asset & asset,Assets & assets,const std::string & version)412 void DistributedDBCloudSyncerLockTest::UpdateCloudAssets(Asset &asset, Assets &assets, const std::string &version)
413 {
414 std::this_thread::sleep_for(std::chrono::milliseconds(1));
415 VBucket data;
416 std::vector<VBucket> record;
417 std::vector<VBucket> extend;
418 asset.name.empty() ? data.insert_or_assign(COL_ASSET, Nil()) : data.insert_or_assign(COL_ASSET, asset);
419 data.insert_or_assign(COL_ID, 0L);
420 data.insert_or_assign(COL_NAME, "name" + std::to_string(g_nameId++));
421 assets.empty() ? data.insert_or_assign(COL_ASSETS, Nil()) : data.insert_or_assign(COL_ASSETS, assets);
422 record.push_back(data);
423 VBucket log;
424 Timestamp now = TimeHelper::GetSysCurrentTime();
425 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
426 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
427 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
428 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(0));
429 log.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
430 extend.push_back(log);
431 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(ASSETS_TABLE_NAME, std::move(record), extend), DBStatus::OK);
432 std::this_thread::sleep_for(std::chrono::milliseconds(1));
433 }
434
CheckUploadAbnormal(OpType opType,int64_t expCnt,bool isCompensated)435 void DistributedDBCloudSyncerLockTest::CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated)
436 {
437 std::string sql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE ";
438 switch (opType) {
439 case OpType::INSERT:
440 sql += isCompensated ? " cloud_gid != '' AND version !='' AND flag&0x10=0" :
441 " cloud_gid != '' AND version !='' AND flag=flag|0x10";
442 break;
443 case OpType::UPDATE:
444 sql += isCompensated ? " cloud_gid != '' AND version !='' AND flag&0x10=0" :
445 " cloud_gid == '' AND version =='' AND flag=flag|0x10";
446 break;
447 case OpType::DELETE:
448 sql += " cloud_gid == '' AND version ==''";
449 break;
450 default:
451 break;
452 }
453 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
454 reinterpret_cast<void *>(expCnt), nullptr), SQLITE_OK);
455 }
456
457 /**
458 * @tc.name: RDBUnlockCloudSync001
459 * @tc.desc: Test sync with no lock
460 * @tc.type: FUNC
461 * @tc.require:
462 * @tc.author: bty
463 */
464 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBUnlockCloudSync001, TestSize.Level0)
465 {
466 /**
467 * @tc.steps:step1. init data and sync with none lock
468 * @tc.expected: step1. return ok.
469 */
470 int cloudCount = 20;
471 int localCount = 10;
472 InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
473 InsertCloudDBData(0, localCount, 0, ASSETS_TABLE_NAME);
474 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::NONE);
475 CallSync(option);
476
477 /**
478 * @tc.steps:step2. insert or replace, check version
479 * @tc.expected: step2. return ok.
480 */
481 std::string sql = "INSERT OR REPLACE INTO " + ASSETS_TABLE_NAME + " VALUES('0', 'XX', '', '');";
482 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
483 sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
484 " where version != '' and version is not null;";
485 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
486 reinterpret_cast<void *>(cloudCount), nullptr), SQLITE_OK);
487 }
488
489 /**
490 * @tc.name: RDBLockSyncTest001
491 * @tc.desc: Test sync deleted data which status is LOCKING.
492 * @tc.type: FUNC
493 * @tc.require:
494 * @tc.author: liaoyonghuang
495 */
496 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBLockSyncTest001, TestSize.Level0)
497 {
498 /**
499 * @tc.steps:step1. init deleted data which status is LOCKING.
500 * @tc.expected: step1. return ok.
501 */
502 int dataCount = 10;
503 InsertLocalData(0, dataCount, ASSETS_TABLE_NAME, true);
504 CloudSyncOption option1 = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
505 CallSync(option1);
506 std::vector<std::vector<uint8_t>> hashKeys;
507 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0", db, hashKeys);
508 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKeys, db), OK);
509 std::string sql = "delete from " + ASSETS_TABLE_NAME + " where _rowid_ = 0";
510 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
511 /**
512 * @tc.steps:step2. sync, check upload info
513 * @tc.expected: step2. return ok.
514 */
515 CloudSyncOption option2 = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT,
516 false, true);
517 CallSync(option2);
518 for (const auto &table : g_syncProcess.tableProcess) {
519 EXPECT_TRUE(table.second.upLoadInfo.successCount != 0u);
520 }
521 }
522
523 /**
524 * @tc.name: RDBConflictCloudSync001
525 * @tc.desc: Both cloud and local are available, local version is empty, with cloud updates before upload
526 * @tc.type: FUNC
527 * @tc.require:
528 * @tc.author: bty
529 */
530 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync001, TestSize.Level0)
531 {
532 /**
533 * @tc.steps:step1. init data and set hook before upload, update cloud data which gid is 1
534 * @tc.expected: step1. return ok.
535 */
536 int cloudCount = 20;
537 int localCount = 10;
538 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
539 InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
540 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
541 int index = 0;
__anoncaa136470502() 542 g_cloudStoreHook->SetDoUploadHook([&index, this]() {
543 if (++index == 1) {
544 UpdateCloudDBData(1, 1, 0, 1, ASSETS_TABLE_NAME);
545 }
546 });
547
548 /**
549 * @tc.steps:step2. sync and check local data
550 * @tc.expected: step2. return ok.
551 */
552 CallSync(option);
553 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
554 " where flag&0x02=0 AND version='20' AND cloud_gid = '1';";
555 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
556 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
557 }
558
559 /**
560 * @tc.name: RDBConflictCloudSync002
561 * @tc.desc: Both cloud and local are available, with cloud updates before upload
562 * @tc.type: FUNC
563 * @tc.require:
564 * @tc.author: bty
565 */
566 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync002, TestSize.Level0)
567 {
568 TestConflictSync001(true);
569 }
570
571 /**
572 * @tc.name: RDBConflictCloudSync003
573 * @tc.desc: Both cloud and local are available, with cloud deletes before upload
574 * @tc.type: FUNC
575 * @tc.require:
576 * @tc.author: bty
577 */
578 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync003, TestSize.Level0)
579 {
580 TestConflictSync001(false);
581 }
582
583 /**
584 * @tc.name: RDBConflictCloudSync003
585 * @tc.desc: Both cloud and local are available, with cloud inserts before upload
586 * @tc.type: FUNC
587 * @tc.require:
588 * @tc.author: bty
589 */
590 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync004, TestSize.Level0)
591 {
592 /**
593 * @tc.steps:step1. init data and sync
594 * @tc.expected: step1. return ok.
595 */
596 int cloudCount = 20;
597 int localCount = 10;
598 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
599 InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
600 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
601 CallSync(option);
602
603 /**
604 * @tc.steps:step2. insert local data and set hook before upload, insert cloud data which id is 20
605 * @tc.expected: step2. return ok.
606 */
607 std::string sql = "INSERT INTO " + ASSETS_TABLE_NAME + " VALUES('20', 'XXX', NULL, NULL);";
608 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
609 int index = 0;
__anoncaa136470602() 610 g_cloudStoreHook->SetDoUploadHook([&index, cloudCount, this]() {
611 if (++index == 1) {
612 InsertCloudDBData(cloudCount, 1, cloudCount, ASSETS_TABLE_NAME);
613 }
614 });
615
616 /**
617 * @tc.steps:step3. set hook for batch insert, return CLOUD_VERSION_CONFLICT err
618 * @tc.expected: step3. return ok.
619 */
620 g_virtualCloudDb->ForkInsertConflict([](const std::string &tableName, VBucket &extend, VBucket &record,
__anoncaa136470702(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 621 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
622 for (auto &[cloudRecord, cloudExtend]: cloudDataVec) {
623 int64_t cloudPk;
624 CloudStorageUtils::GetValueFromVBucket<int64_t>(COL_ID, record, cloudPk);
625 int64_t localPk;
626 CloudStorageUtils::GetValueFromVBucket<int64_t>(COL_ID, cloudRecord, localPk);
627 if (cloudPk != localPk) {
628 continue;
629 }
630 std::string localVersion;
631 CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, extend, localVersion);
632 std::string cloudVersion;
633 CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, cloudExtend,
634 cloudVersion);
635 if (localVersion != cloudVersion) {
636 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
637 return CLOUD_VERSION_CONFLICT;
638 }
639 }
640 return OK;
641 });
642
643 /**
644 * @tc.steps:step3. sync and check local data
645 * @tc.expected: step3. return ok.
646 */
647 CallSync(option);
648 sql = "select count(*) from " + ASSETS_TABLE_NAME + " where name = 'name30' AND id = '20';";
649 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
650 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
651 for (const auto &table : g_syncProcess.tableProcess) {
652 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
653 }
654 }
655
656 /**
657 * @tc.name: QueryCursorTest001
658 * @tc.desc: Test cursor after querying no data
659 * @tc.type: FUNC
660 * @tc.require:
661 * @tc.author: bty
662 */
663 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest001, TestSize.Level0)
664 {
665 /**
666 * @tc.steps:step1. init data and Query with cursor tha exceeds range
667 * @tc.expected: step1. return ok.
668 */
669 int cloudCount = 20;
670 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
671 VBucket extend;
672 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(30);
673 std::vector<VBucket> data;
674
675 /**
676 * @tc.steps:step2. check cursor output param
677 * @tc.expected: step2. return QUERY_END.
678 */
679 EXPECT_EQ(g_virtualCloudDb->Query(ASSETS_TABLE_NAME, extend, data), QUERY_END);
680 EXPECT_EQ(std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]), std::to_string(cloudCount));
681 }
682
683 /**
684 * @tc.name: QueryCursorTest002
685 * @tc.desc: Test cursor in conditional query sync
686 * @tc.type: FUNC
687 * @tc.require:
688 * @tc.author: bty
689 */
690 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest002, TestSize.Level0)
691 {
692 /**
693 * @tc.steps:step1. init data
694 * @tc.expected: step1. return ok.
695 */
696 int count = 10;
697 InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
698 InsertLocalData(0, count, ASSETS_TABLE_NAME, true);
699 std::vector<int> idVec = {2, 3};
700 CloudSyncOption option = PrepareOption(Query::Select().From(ASSETS_TABLE_NAME).In("id", idVec),
701 LockAction::DOWNLOAD, true);
702 int index = 0;
703
704 /**
705 * @tc.steps:step2. sync and check cursor
706 * @tc.expected: step2. return ok.
707 */
__anoncaa136470802(const std::string &, VBucket &extend) 708 g_virtualCloudDb->ForkQuery([&index](const std::string &, VBucket &extend) {
709 if (index == 1) {
710 std::string cursor;
711 CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CURSOR_FIELD, extend, cursor);
712 EXPECT_EQ(cursor, std::string(""));
713 }
714 index++;
715 });
716 CallSync(option);
717 }
718
719 /**
720 * @tc.name: DownloadAssetStatusTest001
721 * @tc.desc: Test download assets status for INSERT
722 * @tc.type: FUNC
723 * @tc.require:
724 * @tc.author: bty
725 */
726 HWTEST_F(DistributedDBCloudSyncerLockTest, DownloadAssetStatusTest001, TestSize.Level0)
727 {
728 /**
729 * @tc.steps:step1. init cloud assert {a, b1, b2}
730 * @tc.expected: step1. return ok.
731 */
732 int count = 1;
733 InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
734 /**
735 * @tc.steps:step2. sync
736 * @tc.expected: step2. assets status is INSERT before download.
737 */
__anoncaa136470902(const std::string &tableName, std::map<std::string, Assets> &assets) 738 g_virtualAssetLoader->ForkDownload([](const std::string &tableName, std::map<std::string, Assets> &assets) {
739 for (const auto &item: assets) {
740 for (const auto &asset: item.second) {
741 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::INSERT));
742 }
743 }
744 });
745 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
746 CallSync(option);
747 CheckAssetStatusNormal();
748 g_virtualAssetLoader->ForkDownload(nullptr);
749 }
750
751 /**
752 * @tc.name: DownloadAssetStatusTest002
753 * @tc.desc: Test download assets status for DELETE
754 * @tc.type: FUNC
755 * @tc.require:
756 * @tc.author: bty
757 */
758 HWTEST_F(DistributedDBCloudSyncerLockTest, DownloadAssetStatusTest002, TestSize.Level0)
759 {
760 /**
761 * @tc.steps:step1. init cloud assert {a, b1, b2} and sync to local
762 * @tc.expected: step1. return ok.
763 */
764 int count = 1;
765 InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
766 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
767 CallSync(option);
768
769 /**
770 * @tc.steps:step2. change cloud assets {b1, b3}
771 * @tc.expected: step2. return ok.
772 */
773 Asset asset = {};
774 Asset b1 = ASSET_COPY;
775 b1.name = ASSET_COPY.name + std::string("0");
776 Asset b2 = ASSET_COPY;
777 b2.name = ASSET_COPY.name + std::string("0") + ASSET_SUFFIX;
778 Asset b3 = ASSET_COPY;
779 b3.name = ASSET_COPY.name + std::string("0") + ASSET_SUFFIX + ASSET_SUFFIX;
780 Assets assets = { b1, b3 };
781 UpdateCloudAssets(asset, assets, std::string("0")); // 1 is version
782 /**
783 * @tc.steps:step3. sync
784 * @tc.expected: step3. download status is a -> DELETE, b2 -> DELETE, b3 -> INSERT
785 */
786 g_virtualAssetLoader->ForkDownload([&b1, &b3](const std::string &tableName,
__anoncaa136470a02(const std::string &tableName, std::map<std::string, Assets> &assets) 787 std::map<std::string, Assets> &assets) {
788 auto it = assets.find(COL_ASSETS);
789 ASSERT_EQ(it != assets.end(), true);
790 ASSERT_EQ(it->second.size(), 1u); // 1 is download size
791 for (const auto &b: it->second) {
792 if (b.name == b3.name) {
793 EXPECT_EQ(b.status, static_cast<uint32_t>(AssetStatus::INSERT));
794 }
795 }
796 });
__anoncaa136470b02(std::map<std::string, Assets> &assets) 797 g_virtualAssetLoader->SetRemoveLocalAssetsCallback([&b2](std::map<std::string, Assets> &assets) {
798 auto it = assets.find(COL_ASSET);
799 EXPECT_EQ(it != assets.end(), true);
800 EXPECT_EQ(it->second.size(), 1u);
801 EXPECT_EQ(it->second[0].status, static_cast<uint32_t>(AssetStatus::DELETE));
802 it = assets.find(COL_ASSETS);
803 EXPECT_EQ(it != assets.end(), true);
804 EXPECT_EQ(it->second.size(), 1u); // 1 is remove size
805 for (const auto &b: it->second) {
806 if (b.name == b2.name) {
807 EXPECT_EQ(b.status, static_cast<uint32_t>(AssetStatus::DELETE));
808 }
809 }
810 return DBStatus::OK;
811 });
812 CallSync(option);
813 g_virtualAssetLoader->ForkDownload(nullptr);
814 g_virtualAssetLoader->SetRemoveLocalAssetsCallback(nullptr);
815 }
816
817 /**
818 * @tc.name: DownloadAssetStatusTest003
819 * @tc.desc: Test download assets status for UPDATE
820 * @tc.type: FUNC
821 * @tc.require:
822 * @tc.author: bty
823 */
824 HWTEST_F(DistributedDBCloudSyncerLockTest, DownloadAssetStatusTest003, TestSize.Level0)
825 {
826 /**
827 * @tc.steps:step1. init cloud assert {a, b1, b2} and sync to local
828 * @tc.expected: step1. return ok.
829 */
830 int count = 1;
831 InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
832 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
833 CallSync(option);
834 /**
835 * @tc.steps:step2. change cloud assets {a, b2}
836 * @tc.expected: step2. return ok.
837 */
838 Asset asset = ASSET_COPY;
839 asset.name = asset.name + "0";
840 asset.hash = "new_hash";
841 Asset b1 = ASSET_COPY;
842 b1.name = ASSET_COPY.name + std::string("0");
843 Asset b2 = ASSET_COPY;
844 b2.name = ASSET_COPY.name + std::string("0") + ASSET_SUFFIX;
845 b2.hash = "new_hash";
846 Assets assets = { b1, b2 };
847 UpdateCloudAssets(asset, assets, std::string("0")); // 1 is version
848 /**
849 * @tc.steps:step3. sync
850 * @tc.expected: step3. download status is a -> UPDATE, b2 -> UPDATE
851 */
852 g_virtualAssetLoader->ForkDownload([&b1, &b2](const std::string &tableName,
__anoncaa136470c02(const std::string &tableName, std::map<std::string, Assets> &assets) 853 std::map<std::string, Assets> &assets) {
854 auto it = assets.find(COL_ASSET);
855 ASSERT_EQ(it != assets.end(), true);
856 ASSERT_EQ(it->second.size(), 1u);
857 EXPECT_EQ(it->second[0].status, static_cast<uint32_t>(AssetStatus::UPDATE));
858
859 it = assets.find(COL_ASSETS);
860 ASSERT_EQ(it != assets.end(), true);
861 ASSERT_EQ(it->second.size(), 1u); // 1 is download size
862 for (const auto &b: it->second) {
863 if (b.name == b2.name) {
864 EXPECT_EQ(b.status, static_cast<uint32_t>(AssetStatus::UPDATE));
865 }
866 }
867 });
868 CallSync(option);
869 g_virtualAssetLoader->ForkDownload(nullptr);
870 g_virtualAssetLoader->SetRemoveLocalAssetsCallback(nullptr);
871 }
872
873 /**
874 * @tc.name: RecordConflictTest001
875 * @tc.desc: Test the asset input param after download return CLOUD_RECORD_EXIST_CONFLICT
876 * @tc.type: FUNC
877 * @tc.require:
878 * @tc.author: bty
879 */
880 HWTEST_F(DistributedDBCloudSyncerLockTest, RecordConflictTest001, TestSize.Level0)
881 {
882 /**
883 * @tc.steps:step1. init data and sync
884 * @tc.expected: step1. return ok.
885 */
886 int count = 10;
887 InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
888 g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
889 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
890 int callCount = 0;
__anoncaa136470d02() 891 g_cloudStoreHook->SetSyncFinishHook([&callCount]() {
892 callCount++;
893 g_processCondition.notify_all();
894 });
895 CallSync(option);
896 {
897 std::unique_lock<std::mutex> lock(g_processMutex);
898 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anoncaa136470e02() 899 [&callCount]() { return callCount == 2; }); // 2 is compensated sync
900 ASSERT_EQ(result, true);
901 }
902
903 /**
904 * @tc.steps:step2. sync again and check asset
905 * @tc.expected: step2. return ok.
906 */
907 g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
__anoncaa136470f02(const std::string &tableName, std::map<std::string, Assets> &assets) 908 g_virtualAssetLoader->ForkDownload([](const std::string &tableName, std::map<std::string, Assets> &assets) {
909 EXPECT_EQ(assets.find(COL_ASSET) != assets.end(), true);
910 });
911 CallSync(option);
912 {
913 std::unique_lock<std::mutex> lock(g_processMutex);
914 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anoncaa136471002() 915 [&callCount]() { return callCount == 4; }); // 4 is compensated sync
916 ASSERT_EQ(result, true);
917 }
918 g_cloudStoreHook->SetSyncFinishHook(nullptr);
919 g_virtualAssetLoader->ForkDownload(nullptr);
920 }
921
922 /**
923 * @tc.name: QueryCursorTest003
924 * @tc.desc: Test whether cursor fallback
925 * @tc.type: FUNC
926 * @tc.require:
927 * @tc.author: bty
928 */
929 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest003, TestSize.Level0)
930 {
931 /**
932 * @tc.steps:step1. init cloud data and sync
933 * @tc.expected: step1. return ok.
934 */
935 int cloudCount = 10;
936 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
937 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
938 CallSync(option);
939
940 /**
941 * @tc.steps:step2. delete cloud data and sync
942 * @tc.expected: step2. return ok.
943 */
944 DeleteCloudDBData(0, cloudCount, ASSETS_TABLE_NAME);
945 CallSync(option);
946
947 /**
948 * @tc.steps:step3. remove data
949 * @tc.expected: step3. return ok.
950 */
951 std::string device = "";
952 ASSERT_EQ(g_delegate->RemoveDeviceData(device, DistributedDB::FLAG_ONLY), DBStatus::OK);
953
954 /**
955 * @tc.steps:step4. insert local and check cursor
956 * @tc.expected: step4. return ok.
957 */
958 InsertLocalData(0, 1, ASSETS_TABLE_NAME, true);
959 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
960 " where cursor='31';";
961 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
962 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
963 }
964
965 /**
966 * @tc.name: QueryCursorTest004
967 * @tc.desc: Test temp trigger under concurrency
968 * @tc.type: FUNC
969 * @tc.require:
970 * @tc.author: bty
971 */
972 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest004, TestSize.Level0)
973 {
974 /**
975 * @tc.steps:step1. init cloud data
976 * @tc.expected: step1. return ok.
977 */
978 int cloudCount = 10;
979 InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
980 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
981
982 /**
983 * @tc.steps:step2. set tracker table before saving cloud data
984 * @tc.expected: step2. return ok.
985 */
__anoncaa136471102(const std::string &table, VBucket &) 986 g_virtualCloudDb->ForkQuery([](const std::string &table, VBucket &) {
987 TrackerSchema schema = {
988 .tableName = ASSETS_TABLE_NAME, .extendColNames = {COL_NAME}, .trackerColNames = { COL_ID }
989 };
990 EXPECT_EQ(g_delegate->SetTrackerTable(schema), WITH_INVENTORY_DATA);
991 });
992 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
993 CallSync(option);
994
995 /**
996 * @tc.steps:step3. check extend_field and cursor
997 * @tc.expected: step3. return ok.
998 */
999 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
1000 " where data_key='0' and json_extract(extend_field, '$.name')='name10' and cursor='32';";
1001 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1002 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
1003 }
1004
1005 /**
1006 * @tc.name: QueryCursorTest006
1007 * @tc.desc: Test cursor increasing when remove assets fail and download assets success
1008 * @tc.type: FUNC
1009 * @tc.require:
1010 * @tc.author: suyue
1011 */
1012 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest006, TestSize.Level0)
1013 {
1014 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1015 /**
1016 * @tc.steps:step1. insert local and sync
1017 * @tc.expected: step1. return ok.
1018 */
1019 InsertLocalData(0, 1, ASSETS_TABLE_NAME, false);
1020 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
1021 CallSync(option);
1022
1023 /**
1024 * @tc.steps:step2. change asset/assets and set BatchRemoveLocalAssets fail
1025 * @tc.expected: step2. return ok.
1026 */
1027 std::string sql = "SELECT asset, assets FROM " + ASSETS_TABLE_NAME + ";";
1028 sqlite3_stmt *stmt = nullptr;
1029 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1030 Asset asset;
1031 Assets assets;
1032 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1033 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1034 ASSERT_EQ(sqlite3_column_type(stmt, 1), SQLITE_BLOB);
1035 Type assetBlob;
1036 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, assetBlob), E_OK);
1037 asset = g_virtualCloudDataTranslate->BlobToAsset(std::get<Bytes>(assetBlob));
1038 Type assetsBlob;
1039 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, assetsBlob), E_OK);
1040 assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(assetsBlob));
1041 }
1042 int errCode = E_OK;
1043 SQLiteUtils::ResetStatement(stmt, true, errCode);
1044 asset.hash = "new_hash";
1045 assets.pop_back();
1046 UpdateCloudAssets(asset, assets, std::string("0"));
1047 g_virtualAssetLoader->SetBatchRemoveStatus(DBStatus::LOCAL_ASSET_NOT_FOUND);
1048
1049 /**
1050 * @tc.steps:step3. sync and check cursor
1051 * @tc.expected: step3. return ok.
1052 */
1053 CallSync(option);
1054 sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
1055 " where cursor='3';";
1056 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1057 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
1058 g_virtualAssetLoader->SetBatchRemoveStatus(DBStatus::OK);
1059 EXPECT_EQ(g_syncProcess.errCode, DBStatus::REMOVE_ASSETS_FAIL);
1060 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1061 }
1062
1063 /**
1064 * @tc.name: UploadAbnormalSync001
1065 * @tc.desc: Test upload update record, cloud returned record not found.
1066 * @tc.type: FUNC
1067 * @tc.require:
1068 * @tc.author: bty
1069 */
1070 HWTEST_F(DistributedDBCloudSyncerLockTest, UploadAbnormalSync001, TestSize.Level0)
1071 {
1072 /**
1073 * @tc.steps:step1. insert local data and sync
1074 * @tc.expected: step1. return ok.
1075 */
1076 int cloudCount = 1;
1077 InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
1078 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::DOWNLOAD);
1079 CallSync(option);
1080
1081 /**
1082 * @tc.steps:step2. update local data and sync, cloud returned record not found.
1083 * @tc.expected: step2. return ok.
1084 */
1085 std::string sql = "update " + ASSETS_TABLE_NAME + " set name = 'xxx' where id = 0;";
1086 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
1087 int upIdx = 0;
__anoncaa136471202(const std::string &tableName, VBucket &extend) 1088 g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
1089 LOGD("cloud db upload index:%d", ++upIdx);
1090 if (upIdx == 1) { // 1 is index
1091 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
1092 }
1093 });
1094 int doUpIdx = 0;
__anoncaa136471302null1095 g_cloudStoreHook->SetDoUploadHook([&doUpIdx] {
1096 LOGD("begin upload index:%d", ++doUpIdx);
1097 });
1098 int callCount = 0;
__anoncaa136471402() 1099 g_cloudStoreHook->SetSyncFinishHook([&callCount, this]() {
1100 LOGD("sync finish times:%d", ++callCount);
1101 if (callCount == 1) { // 1 is the normal sync
1102 CheckUploadAbnormal(OpType::UPDATE, 1L); // 1 is expected count
1103 } else {
1104 CheckUploadAbnormal(OpType::UPDATE, 1L, true); // 1 is expected count
1105 }
1106 g_processCondition.notify_all();
1107 });
1108 CallSync(option);
1109 {
1110 std::unique_lock<std::mutex> lock(g_processMutex);
1111 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anoncaa136471502() 1112 [&callCount]() { return callCount == 2; }); // 2 is sync times
1113 ASSERT_EQ(result, true);
1114 }
1115 }
1116
1117 /**
1118 * @tc.name: UploadAbnormalSync002
1119 * @tc.desc: Test upload insert record, cloud returned record already existed.
1120 * @tc.type: FUNC
1121 * @tc.require:
1122 * @tc.author: bty
1123 */
1124 HWTEST_F(DistributedDBCloudSyncerLockTest, UploadAbnormalSync002, TestSize.Level0)
1125 {
1126 /**
1127 * @tc.steps:step1. insert a and sync
1128 * @tc.expected: step1. return ok.
1129 */
1130 int cloudCount = 1;
1131 InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
1132 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::DOWNLOAD);
1133 CallSync(option);
1134
1135 /**
1136 * @tc.steps:step2. insert b and sync, cloud returned record not found.
1137 * @tc.expected: step2. return ok.
1138 */
1139 InsertLocalData(cloudCount, cloudCount, ASSETS_TABLE_NAME, true);
1140 int upIdx = 0;
__anoncaa136471602(const std::string &tableName, VBucket &extend) 1141 g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
1142 LOGD("cloud db upload index:%d", ++upIdx);
1143 if (upIdx == 2) { // 2 is index
1144 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_ALREADY_EXISTED);
1145 }
1146 });
1147 int doUpIdx = 0;
__anoncaa136471702null1148 g_cloudStoreHook->SetDoUploadHook([&doUpIdx, cloudCount, this] {
1149 LOGD("begin upload index:%d", ++doUpIdx);
1150 if (doUpIdx == 1) { // 1 is index
1151 InsertCloudDBData(cloudCount, cloudCount, cloudCount, ASSETS_TABLE_NAME);
1152 }
1153 });
1154 int callCount = 0;
__anoncaa136471802() 1155 g_cloudStoreHook->SetSyncFinishHook([&callCount, this]() {
1156 LOGD("sync finish times:%d", ++callCount);
1157 if (callCount == 1) { // 1 is the normal sync
1158 CheckUploadAbnormal(OpType::INSERT, 1L); // 1 is expected count
1159 } else {
1160 CheckUploadAbnormal(OpType::INSERT, 2L, true); // 1 is expected count
1161 }
1162 g_processCondition.notify_all();
1163 });
1164 CallSync(option);
1165 {
1166 std::unique_lock<std::mutex> lock(g_processMutex);
1167 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anoncaa136471902() 1168 [&callCount]() { return callCount == 2; }); // 2 is sync times
1169 ASSERT_EQ(result, true);
1170 }
1171 }
1172
1173 /**
1174 * @tc.name: UploadAbnormalSync003
1175 * @tc.desc: Test upload delete record, cloud returned record not found.
1176 * @tc.type: FUNC
1177 * @tc.require:
1178 * @tc.author: bty
1179 */
1180 HWTEST_F(DistributedDBCloudSyncerLockTest, UploadAbnormalSync003, TestSize.Level0)
1181 {
1182 /**
1183 * @tc.steps:step1. insert local data and sync
1184 * @tc.expected: step1. return ok.
1185 */
1186 int cloudCount = 1;
1187 InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
1188 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::DOWNLOAD);
1189 CallSync(option);
1190
1191 /**
1192 * @tc.steps:step2. delete local data and sync, cloud returned record not found.
1193 * @tc.expected: step2. return ok.
1194 */
1195 std::string sql = "delete from " + ASSETS_TABLE_NAME + " where id = 0;";
1196 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
1197 int upIdx = 0;
__anoncaa136471a02(const std::string &tableName, VBucket &extend) 1198 g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
1199 LOGD("cloud db upload index:%d", ++upIdx);
1200 if (upIdx == 2) { // 2 is index
1201 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
1202 }
1203 });
1204 int doUpIdx = 0;
__anoncaa136471b02null1205 g_cloudStoreHook->SetDoUploadHook([&doUpIdx, cloudCount, this] {
1206 LOGD("begin upload index:%d", ++doUpIdx);
1207 if (doUpIdx == 1) { // 1 is index
1208 DeleteCloudDBData(0, cloudCount, ASSETS_TABLE_NAME);
1209 }
1210 });
1211 int callCount = 0;
__anoncaa136471c02() 1212 g_cloudStoreHook->SetSyncFinishHook([&callCount, this]() {
1213 LOGD("sync finish times:%d", ++callCount);
1214 if (callCount == 1) { // 1 is the normal sync
1215 CheckUploadAbnormal(OpType::DELETE, 1L); // 1 is expected count
1216 } else {
1217 CheckUploadAbnormal(OpType::DELETE, 1L, true); // 1 is expected count
1218 }
1219 g_processCondition.notify_all();
1220 });
1221 CallSync(option);
1222 {
1223 std::unique_lock<std::mutex> lock(g_processMutex);
1224 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anoncaa136471d02() 1225 [&callCount]() { return callCount == 1; }); // 1 is sync times
1226 ASSERT_EQ(result, true);
1227 }
1228 }
1229
1230 /**
1231 * @tc.name: ReviseLocalModTimeTest001
1232 * @tc.desc: test sync data with invalid timestamp.
1233 * @tc.type: FUNC
1234 * @tc.require:
1235 * @tc.author: liaoyonghuang
1236 */
1237 HWTEST_F(DistributedDBCloudSyncerLockTest, ReviseLocalModTimeTest001, TestSize.Level0)
1238 {
1239 /**
1240 * @tc.steps:step1. insert local data
1241 * @tc.expected: step1. return ok.
1242 */
1243 int cloudCount = 31; // 31 records
1244 InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
1245 /**
1246 * @tc.steps:step2. Modify time and sync
1247 * @tc.expected: step2. return ok.
1248 */
1249 uint64_t curTime = 0;
1250 EXPECT_EQ(TimeHelper::GetSysCurrentRawTime(curTime), E_OK);
1251 uint64_t invalidTime = curTime + curTime;
1252 std::string sql = "UPDATE " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
1253 " SET timestamp=" + std::to_string(invalidTime) + " where rowid>0";
1254 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
1255 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
1256 CallSync(option);
1257 /**
1258 * @tc.steps:step3. Check modify time in log table
1259 * @tc.expected: step3. return ok.
1260 */
1261 EXPECT_EQ(TimeHelper::GetSysCurrentRawTime(curTime), E_OK);
1262 sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
1263 " where timestamp < " + std::to_string(curTime);
1264 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
1265 reinterpret_cast<void *>(cloudCount), nullptr), SQLITE_OK);
1266 }
1267
1268 /**
1269 * @tc.name: RemoveAssetsFailTest001
1270 * @tc.desc: Test failCount when remove assets fail
1271 * @tc.type: FUNC
1272 * @tc.require:
1273 * @tc.author: suyue
1274 */
1275 HWTEST_F(DistributedDBCloudSyncerLockTest, RemoveAssetsFailTest001, TestSize.Level0)
1276 {
1277 /**
1278 * @tc.steps:step1. insert local and sync
1279 * @tc.expected: step1. return ok.
1280 */
1281 RuntimeContext::GetInstance()->SetBatchDownloadAssets(true);
1282 InsertLocalData(0, 1, ASSETS_TABLE_NAME, false);
1283 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
1284 CallSync(option);
1285
1286 /**
1287 * @tc.steps:step2. change asset and set RemoveLocalAssets fail
1288 * @tc.expected: step2. return ok.
1289 */
1290 std::string sql = "SELECT asset, assets FROM " + ASSETS_TABLE_NAME + ";";
1291 sqlite3_stmt *stmt = nullptr;
1292 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1293 Asset asset;
1294 Assets assets;
1295 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1296 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1297 ASSERT_EQ(sqlite3_column_type(stmt, 1), SQLITE_BLOB);
1298 Type assetsBlob;
1299 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, assetsBlob), E_OK);
1300 assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(assetsBlob));
1301 Type assetBlob;
1302 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, assetBlob), E_OK);
1303 asset = g_virtualCloudDataTranslate->BlobToAsset(std::get<Bytes>(assetBlob));
1304 }
1305 int errCode = E_OK;
1306 SQLiteUtils::ResetStatement(stmt, true, errCode);
1307 asset.hash = "new_hash";
1308 assets.pop_back();
1309 UpdateCloudAssets(asset, assets, std::string("0"));
1310 g_virtualAssetLoader->SetRemoveStatus(DBStatus::LOCAL_ASSET_NOT_FOUND);
1311
1312 /**
1313 * @tc.steps:step3. sync and check failCount
1314 * @tc.expected: step3. return ok.
1315 */
1316 int downLoadCount = 0;
1317 g_virtualAssetLoader->ForkDownload([this, &downLoadCount](const std::string &tableName,
__anoncaa136471e02(const std::string &tableName, std::map<std::string, Assets> &assets) 1318 std::map<std::string, Assets> &assets) {
1319 downLoadCount++;
1320 if (downLoadCount == 1) {
1321 std::string sql = "delete from " + ASSETS_TABLE_NAME + " WHERE id=0";
1322 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
1323 }
1324 });
1325 CallSync(option);
1326 for (const auto &table : g_syncProcess.tableProcess) {
1327 EXPECT_EQ(table.second.downLoadInfo.failCount, 1u);
1328 }
1329 g_virtualAssetLoader->SetRemoveStatus(DBStatus::OK);
1330 RuntimeContext::GetInstance()->SetBatchDownloadAssets(false);
1331 g_virtualAssetLoader->ForkDownload(nullptr);
1332 }
1333
1334 /**
1335 * @tc.name: CompensateSyncTest001
1336 * @tc.desc: Test only compensates for the sync of deleted data
1337 * @tc.type: FUNC
1338 * @tc.require:
1339 * @tc.author: bty
1340 */
1341 HWTEST_F(DistributedDBCloudSyncerLockTest, CompensateSyncTest001, TestSize.Level1)
1342 {
1343 /**
1344 * @tc.steps:step1. insert cloud and sync
1345 * @tc.expected: step1. return ok.
1346 */
1347 int cloudCount = 30;
1348 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1349 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
1350 CallSync(option);
1351
1352 /**
1353 * @tc.steps:step2. lock and delete 1-10
1354 * @tc.expected: step2. return ok.
1355 */
1356 std::vector<std::vector<uint8_t>> hashKeys;
1357 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key < 10 ", db, hashKeys);
1358 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKeys, db), OK);
1359 std::string sql = "delete from " + ASSETS_TABLE_NAME + " where id < 10;";
1360 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
1361
1362 /**
1363 * @tc.steps:step3. compensate sync and check query type
1364 * @tc.expected: step3. return ok.
1365 */
__anoncaa136471f02(const std::string &, VBucket &extend) 1366 g_virtualCloudDb->ForkQuery([](const std::string &, VBucket &extend) {
1367 int64_t type;
1368 CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::TYPE_FIELD, extend, type);
1369 EXPECT_EQ(type, 1u);
1370 });
1371 CloudSyncOption cOption = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT,
1372 true, true);
1373 CallSync(cOption);
1374 sleep(1);
1375
1376 /**
1377 * @tc.steps:step4. lock and delete id 30
1378 * @tc.expected: step4. return ok.
1379 */
1380 InsertLocalData(cloudCount, 1, ASSETS_TABLE_NAME, true);
1381 hashKeys.clear();
1382 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 30 ", db, hashKeys);
1383 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKeys, db), OK);
1384 sql = "delete from " + ASSETS_TABLE_NAME + " where id = 30;";
1385 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
1386
1387 /**
1388 * @tc.steps:step5. compensate sync and check query type
1389 * @tc.expected: step5. return ok.
1390 */
__anoncaa136472002(const std::string &, VBucket &extend) 1391 g_virtualCloudDb->ForkQuery([](const std::string &, VBucket &extend) {
1392 int64_t type;
1393 CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::TYPE_FIELD, extend, type);
1394 EXPECT_EQ(type, 0u);
1395 });
1396 CallSync(cOption);
1397 sleep(1);
1398 g_virtualCloudDb->ForkQuery(nullptr);
1399 }
1400
1401 /**
1402 * @tc.name: UnLockSyncTest001
1403 * @tc.desc: Test sync after unlock data
1404 * @tc.type: FUNC
1405 * @tc.require:
1406 * @tc.author: suyue
1407 */
1408 HWTEST_F(DistributedDBCloudSyncerLockTest, UnLockSyncTest001, TestSize.Level1)
1409 {
1410 /**
1411 * @tc.steps: step1. insert data and sync
1412 * @tc.expected: step1. return ok.
1413 */
1414 int localCount = 100;
1415 InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
1416 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
1417 CallSync(option);
1418
1419 /**
1420 * @tc.steps: step2. lock 0-70, update all data and unlock
1421 * @tc.expected: step2. unlock return WAIT_COMPENSATED_SYNC.
1422 */
1423 std::vector<std::vector<uint8_t>> hashKeys;
1424 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key < 70 ", db, hashKeys);
1425 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKeys, db), OK);
1426 std::string sql = "update " + ASSETS_TABLE_NAME + " set name = 'xxx';";
1427 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
1428
1429 /**
1430 * @tc.steps: step3. non-compensated sync for condition query and isPriorityTask is true.
1431 * @tc.expected: step3. sync data that is not in the UNLOCKING state.
1432 */
1433 EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKeys, db), WAIT_COMPENSATED_SYNC);
1434 std::vector<int> values;
1435 for (int i = 50; i < 100; i++) {
1436 values.push_back(i);
1437 }
1438 option = PrepareOption(Query::Select().From(ASSETS_TABLE_NAME).In("id", values), LockAction::INSERT, true, false);
1439 CallSync(option);
1440 for (const auto &table : g_syncProcess.tableProcess) {
1441 EXPECT_EQ(table.second.upLoadInfo.total, 30u);
1442 }
1443
1444 /**
1445 * @tc.steps: step4. compensate sync and check upLoadInfo
1446 * @tc.expected: step4. synch all data to be compensated in the UNLOCKING state.
1447 */
1448 option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT, true, true);
1449 CallSync(option);
1450 for (const auto &table : g_syncProcess.tableProcess) {
1451 EXPECT_EQ(table.second.upLoadInfo.total, 70u);
1452 }
1453 }
1454
1455 /**
1456 * @tc.name: TaskIdTest001
1457 * @tc.desc: Test sync with specific task id
1458 * @tc.type: FUNC
1459 * @tc.require:
1460 * @tc.author: liaoyonghuang
1461 */
1462 HWTEST_F(DistributedDBCloudSyncerLockTest, TaskIdTest001, TestSize.Level0)
1463 {
1464 /**
1465 * @tc.steps:step1. insert cloud and sync
1466 * @tc.expected: step1. return ok.
1467 */
1468 int cloudCount = 10;
1469 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1470 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
1471 /**
1472 * @tc.steps:step2. sync with specific task id(1) when query
1473 * @tc.expected: step2. return ok.
1474 */
1475 int queryTime = 0;
__anoncaa136472102(const std::string &, VBucket &extend) 1476 g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &extend) {
1477 if (queryTime == 0) {
1478 queryTime++;
1479 EXPECT_EQ(g_delegate->Sync(option, nullptr, 1u), OK);
1480 }
1481 });
1482 CallSync(option);
1483 /**
1484 * @tc.steps:step3. sync without task id
1485 * @tc.expected: step3. return ok.
1486 */
1487 std::this_thread::sleep_for(std::chrono::milliseconds(200));
__anoncaa136472202(const std::string &, VBucket &extend) 1488 g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &extend) {
1489 SyncProcess syncProcess = g_delegate->GetCloudTaskStatus(UINT64_MAX - 1);
1490 EXPECT_EQ(syncProcess.errCode, OK);
1491 });
1492 CallSync(option);
1493 g_virtualCloudDb->ForkQuery(nullptr);
1494 }
1495 } // namespace
1496 #endif // RELATIONAL_STORE