1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "cloud/asset_operation_utils.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "relational_store_client.h"
24 #include "relational_store_instance.h"
25 #include "relational_store_manager.h"
26 #include "runtime_config.h"
27 #include "sqlite_relational_store.h"
28 #include "sqlite_relational_utils.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include <gtest/gtest.h>
34 #include <iostream>
35
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40
41 namespace {
42 const string STORE_ID = "Relational_Store_Lock_Sync";
43 const string DB_SUFFIX = ".db";
44 const string ASSETS_TABLE_NAME = "student";
45 const string ASSETS_TABLE_NAME_SHARED = "student_shared";
46 const string DEVICE_CLOUD = "cloud_dev";
47 const string COL_ID = "id";
48 const string COL_NAME = "name";
49 const string COL_HEIGHT = "height";
50 const string COL_ASSET = "asset";
51 const string COL_ASSETS = "assets";
52 const string COL_AGE = "age";
53 const int64_t WAIT_TIME = 5;
54 const std::vector<Field> CLOUD_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true}, {COL_NAME, TYPE_INDEX<std::string>},
55 {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>}};
56 const string CREATE_SINGLE_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + ASSETS_TABLE_NAME + "(" + COL_ID +
57 " INTEGER PRIMARY KEY," + COL_NAME + " TEXT ," + COL_ASSET + " ASSET," + COL_ASSETS + " ASSETS" + ");";
58 const Asset ASSET_COPY = {.version = 1,
59 .name = "Phone",
60 .assetId = "0",
61 .subpath = "/local/sync",
62 .uri = "/local/sync",
63 .modifyTime = "123456",
64 .createTime = "",
65 .size = "256",
66 .hash = "ASE"};
67 const Assets ASSETS_COPY1 = { ASSET_COPY };
68 const string ASSET_SUFFIX = "_copy";
69
70 string g_storePath;
71 string g_testDir;
72 RelationalStoreObserverUnitTest *g_observer = nullptr;
73 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
74 RelationalStoreDelegate *g_delegate = nullptr;
75 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
76 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
77 std::shared_ptr<VirtualCloudDataTranslate> g_virtualCloudDataTranslate;
78 SyncProcess g_syncProcess;
79 std::condition_variable g_processCondition;
80 std::mutex g_processMutex;
81 IRelationalStore *g_store = nullptr;
82 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
83 int64_t g_nameId;
84 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
85
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)86 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
87 {
88 TableSchema assetsTableSchema = {.name = ASSETS_TABLE_NAME, .sharedTableName = ASSETS_TABLE_NAME_SHARED,
89 .fields = CLOUD_FIELDS};
90 dataBaseSchema.tables.push_back(assetsTableSchema);
91 }
92
CloseDb()93 void CloseDb()
94 {
95 delete g_observer;
96 g_virtualCloudDb = nullptr;
97 if (g_delegate != nullptr) {
98 EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
99 g_delegate = nullptr;
100 }
101 }
102
103 class DistributedDBCloudSyncerLockTest : public testing::Test {
104 public:
105 static void SetUpTestCase(void);
106 static void TearDownTestCase(void);
107 void SetUp();
108 void TearDown();
109
110 protected:
111 void Init();
112 const RelationalSyncAbleStorage *GetRelationalStore();
113 void InsertLocalData(int64_t begin, int64_t count, const std::string &tableName, bool isAssetNull = true);
114 void GenerateDataRecords(
115 int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend);
116 void InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart, const std::string &tableName);
117 void UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart, int64_t versionStart,
118 const std::string &tableName);
119 void DeleteCloudDBData(int64_t beginGid, int64_t count, const std::string &tableName);
120 void CallSync(const CloudSyncOption &option, DBStatus expectResult = OK);
121
122 void TestConflictSync001(bool isUpdate);
123 void CheckAssetStatusNormal();
124 void UpdateCloudAssets(Asset &asset, Assets &assets, const std::string &version);
125 void CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated = false);
126 sqlite3 *db = nullptr;
127 };
128
SetUpTestCase(void)129 void DistributedDBCloudSyncerLockTest::SetUpTestCase(void)
130 {
131 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
132 g_storePath = g_testDir + "/" + STORE_ID + DB_SUFFIX;
133 LOGI("The test db is:%s", g_storePath.c_str());
134 g_virtualCloudDataTranslate = std::make_shared<VirtualCloudDataTranslate>();
135 RuntimeConfig::SetCloudTranslate(g_virtualCloudDataTranslate);
136 }
137
TearDownTestCase(void)138 void DistributedDBCloudSyncerLockTest::TearDownTestCase(void) {}
139
SetUp(void)140 void DistributedDBCloudSyncerLockTest::SetUp(void)
141 {
142 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
143 LOGE("rm test db files error.");
144 }
145 DistributedDBToolsUnitTest::PrintTestCaseInfo();
146 LOGD("Test dir is %s", g_testDir.c_str());
147 Init();
148 g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
149 ASSERT_NE(g_cloudStoreHook, nullptr);
150 }
151
TearDown(void)152 void DistributedDBCloudSyncerLockTest::TearDown(void)
153 {
154 RefObject::DecObjRef(g_store);
155 g_virtualCloudDb->ForkUpload(nullptr);
156 CloseDb();
157 EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
158 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
159 LOGE("rm test db files error.");
160 }
161 }
162
Init()163 void DistributedDBCloudSyncerLockTest::Init()
164 {
165 db = RelationalTestUtils::CreateDataBase(g_storePath);
166 ASSERT_NE(db, nullptr);
167 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
168 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_SINGLE_PRIMARY_KEY_TABLE), SQLITE_OK);
169 g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
170 ASSERT_NE(g_observer, nullptr);
171 ASSERT_EQ(g_mgr.OpenStore(g_storePath, STORE_ID, RelationalStoreDelegate::Option{.observer = g_observer},
172 g_delegate), DBStatus::OK);
173 ASSERT_NE(g_delegate, nullptr);
174 ASSERT_EQ(g_delegate->CreateDistributedTable(ASSETS_TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
175 g_virtualCloudDb = make_shared<VirtualCloudDb>();
176 g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
177 g_syncProcess = {};
178 ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
179 ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
180 DataBaseSchema dataBaseSchema;
181 GetCloudDbSchema(dataBaseSchema);
182 ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
183 g_nameId = 0;
184 }
185
GetRelationalStore()186 const RelationalSyncAbleStorage* DistributedDBCloudSyncerLockTest::GetRelationalStore()
187 {
188 RelationalDBProperties properties;
189 CloudDBSyncUtilsTest::InitStoreProp(g_storePath, APP_ID, USER_ID, STORE_ID, properties);
190 int errCode = E_OK;
191 g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
192 if (g_store == nullptr) {
193 return nullptr;
194 }
195 return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
196 }
197
198
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,std::vector<VBucket> & record,std::vector<VBucket> & extend)199 void DistributedDBCloudSyncerLockTest::GenerateDataRecords(
200 int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend)
201 {
202 for (int64_t i = begin; i < begin + count; i++) {
203 Assets assets;
204 Asset asset = ASSET_COPY;
205 asset.name = ASSET_COPY.name + std::to_string(i);
206 assets.emplace_back(asset);
207 VBucket data;
208 data.insert_or_assign(COL_ASSET, asset);
209 asset.name = ASSET_COPY.name + std::to_string(i) + "_copy";
210 assets.emplace_back(asset);
211 data.insert_or_assign(COL_ID, i);
212 data.insert_or_assign(COL_NAME, "name" + std::to_string(g_nameId++));
213 data.insert_or_assign(COL_ASSETS, assets);
214 record.push_back(data);
215
216 VBucket log;
217 Timestamp now = TimeHelper::GetSysCurrentTime();
218 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
219 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
220 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
221 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
222 extend.push_back(log);
223 }
224 }
225
InsertLocalData(int64_t begin,int64_t count,const std::string & tableName,bool isAssetNull)226 void DistributedDBCloudSyncerLockTest::InsertLocalData(int64_t begin, int64_t count,
227 const std::string &tableName, bool isAssetNull)
228 {
229 int errCode;
230 std::vector<VBucket> record;
231 std::vector<VBucket> extend;
232 GenerateDataRecords(begin, count, 0, record, extend);
233 const string sql = "insert or replace into " + tableName + " values (?,?,?,?);";
234 for (VBucket vBucket : record) {
235 sqlite3_stmt *stmt = nullptr;
236 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
237 ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, std::get<int64_t>(vBucket[COL_ID])), E_OK); // 1 is id
238 ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, std::get<string>(vBucket[COL_NAME])), E_OK); // 2 is name
239 if (isAssetNull) {
240 ASSERT_EQ(sqlite3_bind_null(stmt, 3), SQLITE_OK); // 3 is asset
241 } else {
242 std::vector<uint8_t> assetBlob = g_virtualCloudDataTranslate->AssetToBlob(ASSET_COPY);
243 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 3, assetBlob, false), E_OK); // 3 is asset
244 }
245 std::vector<uint8_t> assetsBlob = g_virtualCloudDataTranslate->AssetsToBlob(
246 std::get<Assets>(vBucket[COL_ASSETS]));
247 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 4, assetsBlob, false), E_OK); // 4 is assets
248 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
249 SQLiteUtils::ResetStatement(stmt, true, errCode);
250 }
251 }
252
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const std::string & tableName)253 void DistributedDBCloudSyncerLockTest::InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
254 const std::string &tableName)
255 {
256 std::this_thread::sleep_for(std::chrono::milliseconds(1));
257 std::vector<VBucket> record;
258 std::vector<VBucket> extend;
259 GenerateDataRecords(begin, count, gidStart, record, extend);
260 ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(tableName, std::move(record), extend), DBStatus::OK);
261 std::this_thread::sleep_for(std::chrono::milliseconds(1));
262 }
263
UpdateCloudDBData(int64_t begin,int64_t count,int64_t gidStart,int64_t versionStart,const std::string & tableName)264 void DistributedDBCloudSyncerLockTest::UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
265 int64_t versionStart, const std::string &tableName)
266 {
267 std::this_thread::sleep_for(std::chrono::milliseconds(1));
268 std::vector<VBucket> record;
269 std::vector<VBucket> extend;
270 GenerateDataRecords(begin, count, gidStart, record, extend);
271 for (auto &entry: extend) {
272 entry[CloudDbConstant::VERSION_FIELD] = std::to_string(versionStart++);
273 }
274 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(tableName, std::move(record), extend), DBStatus::OK);
275 std::this_thread::sleep_for(std::chrono::milliseconds(1));
276 }
277
DeleteCloudDBData(int64_t beginGid,int64_t count,const std::string & tableName)278 void DistributedDBCloudSyncerLockTest::DeleteCloudDBData(int64_t beginGid, int64_t count,
279 const std::string &tableName)
280 {
281 Timestamp now = TimeHelper::GetSysCurrentTime();
282 std::vector<VBucket> extend;
283 for (int64_t i = 0; i < count; ++i) {
284 VBucket log;
285 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
286 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
287 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(beginGid + i));
288 extend.push_back(log);
289 }
290 ASSERT_EQ(g_virtualCloudDb->BatchDelete(tableName, extend), DBStatus::OK);
291 std::this_thread::sleep_for(std::chrono::milliseconds(count));
292 }
293
PrepareOption(const Query & query,LockAction action,bool isPriorityTask=false,bool isCompensatedSyncOnly=false)294 CloudSyncOption PrepareOption(const Query &query, LockAction action, bool isPriorityTask = false,
295 bool isCompensatedSyncOnly = false)
296 {
297 CloudSyncOption option;
298 option.devices = { "CLOUD" };
299 option.mode = SYNC_MODE_CLOUD_MERGE;
300 option.query = query;
301 option.waitTime = WAIT_TIME;
302 option.priorityTask = isPriorityTask;
303 option.compensatedSyncOnly = isCompensatedSyncOnly;
304 option.lockAction = action;
305 return option;
306 }
307
CallSync(const CloudSyncOption & option,DBStatus expectResult)308 void DistributedDBCloudSyncerLockTest::CallSync(const CloudSyncOption &option, DBStatus expectResult)
309 {
310 std::mutex dataMutex;
311 std::condition_variable cv;
312 bool finish = false;
313 SyncProcess last;
314 auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
315 for (const auto &item: process) {
316 if (item.second.process == DistributedDB::FINISHED) {
317 {
318 std::lock_guard<std::mutex> autoLock(dataMutex);
319 finish = true;
320 last = item.second;
321 }
322 cv.notify_one();
323 }
324 }
325 };
326 ASSERT_EQ(g_delegate->Sync(option, callback), expectResult);
327 if (expectResult == OK) {
328 std::unique_lock<std::mutex> uniqueLock(dataMutex);
329 cv.wait(uniqueLock, [&finish]() {
330 return finish;
331 });
332 }
333 g_syncProcess = last;
334 }
335
TestConflictSync001(bool isUpdate)336 void DistributedDBCloudSyncerLockTest::TestConflictSync001(bool isUpdate)
337 {
338 /**
339 * @tc.steps:step1. init data and sync
340 * @tc.expected: step1. return ok.
341 */
342 int cloudCount = 20;
343 int localCount = 10;
344 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
345 InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
346 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
347 CallSync(option);
348
349 /**
350 * @tc.steps:step2. update local data to upload, and set hook before upload, operator cloud data which id is 1
351 * @tc.expected: step2. return ok.
352 */
353 std::string sql;
354 if (isUpdate) {
355 sql = "update " + ASSETS_TABLE_NAME + " set name = 'xxx' where id = 1;";
356 } else {
357 sql = "delete from " + ASSETS_TABLE_NAME + " where id = 1;";
358 }
359 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
360 int index = 0;
361 g_cloudStoreHook->SetDoUploadHook([&index, this]() {
362 if (++index == 1) {
363 UpdateCloudDBData(1, 1, 0, 21, ASSETS_TABLE_NAME); // 21 is version
364 }
365 });
366
367 /**
368 * @tc.steps:step3. sync and check local data
369 * @tc.expected: step3. return ok.
370 */
371 CallSync(option);
372 sql = "select count(*) from " + ASSETS_TABLE_NAME + " where name = 'name30' AND id = '1';";
373 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
374 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
375 }
376
CheckAssetStatusNormal()377 void DistributedDBCloudSyncerLockTest::CheckAssetStatusNormal()
378 {
379 std::string sql = "SELECT asset, assets FROM " + ASSETS_TABLE_NAME + ";";
380 sqlite3_stmt *stmt = nullptr;
381 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
382 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
383 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
384 ASSERT_EQ(sqlite3_column_type(stmt, 1), SQLITE_BLOB);
385 Type assetBlob;
386 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, assetBlob), E_OK);
387 Asset asset = g_virtualCloudDataTranslate->BlobToAsset(std::get<Bytes>(assetBlob));
388 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
389 Type assetsBlob;
390 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, assetsBlob), E_OK);
391 Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(assetsBlob));
392 for (const auto &as : assets) {
393 EXPECT_EQ(as.status, static_cast<uint32_t>(AssetStatus::NORMAL));
394 }
395 }
396 int errCode = E_OK;
397 SQLiteUtils::ResetStatement(stmt, true, errCode);
398 }
399
UpdateCloudAssets(Asset & asset,Assets & assets,const std::string & version)400 void DistributedDBCloudSyncerLockTest::UpdateCloudAssets(Asset &asset, Assets &assets, const std::string &version)
401 {
402 std::this_thread::sleep_for(std::chrono::milliseconds(1));
403 VBucket data;
404 std::vector<VBucket> record;
405 std::vector<VBucket> extend;
406 asset.name.empty() ? data.insert_or_assign(COL_ASSET, Nil()) : data.insert_or_assign(COL_ASSET, asset);
407 data.insert_or_assign(COL_ID, 0L);
408 data.insert_or_assign(COL_NAME, "name" + std::to_string(g_nameId++));
409 assets.empty() ? data.insert_or_assign(COL_ASSETS, Nil()) : data.insert_or_assign(COL_ASSETS, assets);
410 record.push_back(data);
411 VBucket log;
412 Timestamp now = TimeHelper::GetSysCurrentTime();
413 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
414 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
415 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
416 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(0));
417 log.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
418 extend.push_back(log);
419 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(ASSETS_TABLE_NAME, std::move(record), extend), DBStatus::OK);
420 std::this_thread::sleep_for(std::chrono::milliseconds(1));
421 }
422
CheckUploadAbnormal(OpType opType,int64_t expCnt,bool isCompensated)423 void DistributedDBCloudSyncerLockTest::CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated)
424 {
425 std::string sql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE ";
426 switch (opType) {
427 case OpType::INSERT:
428 sql += isCompensated ? " cloud_gid != '' AND version !='' AND flag&0x10=0" :
429 " cloud_gid != '' AND version !='' AND flag=flag|0x10";
430 break;
431 case OpType::UPDATE:
432 sql += isCompensated ? " cloud_gid != '' AND version !='' AND flag&0x10=0" :
433 " cloud_gid == '' AND version =='' AND flag=flag|0x10";
434 break;
435 case OpType::DELETE:
436 sql += " cloud_gid == '' AND version ==''";
437 break;
438 default:
439 break;
440 }
441 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
442 reinterpret_cast<void *>(expCnt), nullptr), SQLITE_OK);
443 }
444
445 /**
446 * @tc.name: RDBUnlockCloudSync001
447 * @tc.desc: Test sync with no lock
448 * @tc.type: FUNC
449 * @tc.require:
450 * @tc.author: bty
451 */
452 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBUnlockCloudSync001, TestSize.Level0)
453 {
454 /**
455 * @tc.steps:step1. init data and sync with none lock
456 * @tc.expected: step1. return ok.
457 */
458 int cloudCount = 20;
459 int localCount = 10;
460 InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
461 InsertCloudDBData(0, localCount, 0, ASSETS_TABLE_NAME);
462 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::NONE);
463 CallSync(option);
464
465 /**
466 * @tc.steps:step2. insert or replace, check version
467 * @tc.expected: step2. return ok.
468 */
469 std::string sql = "INSERT OR REPLACE INTO " + ASSETS_TABLE_NAME + " VALUES('0', 'XX', '', '');";
470 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
471 sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
472 " where version != '' and version is not null;";
473 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
474 reinterpret_cast<void *>(cloudCount), nullptr), SQLITE_OK);
475 }
476
477 /**
478 * @tc.name: RDBConflictCloudSync001
479 * @tc.desc: Both cloud and local are available, local version is empty, with cloud updates before upload
480 * @tc.type: FUNC
481 * @tc.require:
482 * @tc.author: bty
483 */
484 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync001, TestSize.Level0)
485 {
486 /**
487 * @tc.steps:step1. init data and set hook before upload, update cloud data which gid is 1
488 * @tc.expected: step1. return ok.
489 */
490 int cloudCount = 20;
491 int localCount = 10;
492 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
493 InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
494 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
495 int index = 0;
__anoncb3c72070502() 496 g_cloudStoreHook->SetDoUploadHook([&index, this]() {
497 if (++index == 1) {
498 UpdateCloudDBData(1, 1, 0, 1, ASSETS_TABLE_NAME);
499 }
500 });
501
502 /**
503 * @tc.steps:step2. sync and check local data
504 * @tc.expected: step2. return ok.
505 */
506 CallSync(option);
507 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
508 " where flag&0x02=0 AND version='20' AND cloud_gid = '1';";
509 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
510 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
511 }
512
513 /**
514 * @tc.name: RDBConflictCloudSync002
515 * @tc.desc: Both cloud and local are available, with cloud updates before upload
516 * @tc.type: FUNC
517 * @tc.require:
518 * @tc.author: bty
519 */
520 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync002, TestSize.Level0)
521 {
522 TestConflictSync001(true);
523 }
524
525 /**
526 * @tc.name: RDBConflictCloudSync003
527 * @tc.desc: Both cloud and local are available, with cloud deletes before upload
528 * @tc.type: FUNC
529 * @tc.require:
530 * @tc.author: bty
531 */
532 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync003, TestSize.Level0)
533 {
534 TestConflictSync001(false);
535 }
536
537 /**
538 * @tc.name: RDBConflictCloudSync003
539 * @tc.desc: Both cloud and local are available, with cloud inserts before upload
540 * @tc.type: FUNC
541 * @tc.require:
542 * @tc.author: bty
543 */
544 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync004, TestSize.Level0)
545 {
546 /**
547 * @tc.steps:step1. init data and sync
548 * @tc.expected: step1. return ok.
549 */
550 int cloudCount = 20;
551 int localCount = 10;
552 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
553 InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
554 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
555 CallSync(option);
556
557 /**
558 * @tc.steps:step2. insert local data and set hook before upload, insert cloud data which id is 20
559 * @tc.expected: step2. return ok.
560 */
561 std::string sql = "INSERT INTO " + ASSETS_TABLE_NAME + " VALUES('20', 'XXX', NULL, NULL);";
562 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
563 int index = 0;
__anoncb3c72070602() 564 g_cloudStoreHook->SetDoUploadHook([&index, cloudCount, this]() {
565 if (++index == 1) {
566 InsertCloudDBData(cloudCount, 1, cloudCount, ASSETS_TABLE_NAME);
567 }
568 });
569
570 /**
571 * @tc.steps:step3. set hook for batch insert, return CLOUD_VERSION_CONFLICT err
572 * @tc.expected: step3. return ok.
573 */
574 g_virtualCloudDb->ForkInsertConflict([](const std::string &tableName, VBucket &extend, VBucket &record,
__anoncb3c72070702(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 575 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
576 for (auto &[cloudRecord, cloudExtend]: cloudDataVec) {
577 int64_t cloudPk;
578 CloudStorageUtils::GetValueFromVBucket<int64_t>(COL_ID, record, cloudPk);
579 int64_t localPk;
580 CloudStorageUtils::GetValueFromVBucket<int64_t>(COL_ID, cloudRecord, localPk);
581 if (cloudPk != localPk) {
582 continue;
583 }
584 std::string localVersion;
585 CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, extend, localVersion);
586 std::string cloudVersion;
587 CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, cloudExtend,
588 cloudVersion);
589 if (localVersion != cloudVersion) {
590 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
591 return CLOUD_VERSION_CONFLICT;
592 }
593 }
594 return OK;
595 });
596
597 /**
598 * @tc.steps:step3. sync and check local data
599 * @tc.expected: step3. return ok.
600 */
601 CallSync(option);
602 sql = "select count(*) from " + ASSETS_TABLE_NAME + " where name = 'name30' AND id = '20';";
603 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
604 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
605 for (const auto &table : g_syncProcess.tableProcess) {
606 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
607 }
608 }
609
610 /**
611 * @tc.name: QueryCursorTest001
612 * @tc.desc: Test cursor after querying no data
613 * @tc.type: FUNC
614 * @tc.require:
615 * @tc.author: bty
616 */
617 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest001, TestSize.Level0)
618 {
619 /**
620 * @tc.steps:step1. init data and Query with cursor tha exceeds range
621 * @tc.expected: step1. return ok.
622 */
623 int cloudCount = 20;
624 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
625 VBucket extend;
626 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(30);
627 std::vector<VBucket> data;
628
629 /**
630 * @tc.steps:step2. check cursor output param
631 * @tc.expected: step2. return QUERY_END.
632 */
633 EXPECT_EQ(g_virtualCloudDb->Query(ASSETS_TABLE_NAME, extend, data), QUERY_END);
634 EXPECT_EQ(std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]), std::to_string(cloudCount));
635 }
636
637 /**
638 * @tc.name: QueryCursorTest002
639 * @tc.desc: Test cursor in conditional query sync
640 * @tc.type: FUNC
641 * @tc.require:
642 * @tc.author: bty
643 */
644 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest002, TestSize.Level0)
645 {
646 /**
647 * @tc.steps:step1. init data
648 * @tc.expected: step1. return ok.
649 */
650 int count = 10;
651 InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
652 InsertLocalData(0, count, ASSETS_TABLE_NAME, true);
653 std::vector<int> idVec = {2, 3};
654 CloudSyncOption option = PrepareOption(Query::Select().From(ASSETS_TABLE_NAME).In("id", idVec),
655 LockAction::DOWNLOAD, true);
656 int index = 0;
657
658 /**
659 * @tc.steps:step2. sync and check cursor
660 * @tc.expected: step2. return ok.
661 */
__anoncb3c72070802(const std::string &, VBucket &extend) 662 g_virtualCloudDb->ForkQuery([&index](const std::string &, VBucket &extend) {
663 if (index == 1) {
664 std::string cursor;
665 CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CURSOR_FIELD, extend, cursor);
666 EXPECT_EQ(cursor, std::string(""));
667 }
668 index++;
669 });
670 CallSync(option);
671 }
672
673 /**
674 * @tc.name: RecordConflictTest001
675 * @tc.desc: Test the asset input param after download return CLOUD_RECORD_EXIST_CONFLICT
676 * @tc.type: FUNC
677 * @tc.require:
678 * @tc.author: bty
679 */
680 HWTEST_F(DistributedDBCloudSyncerLockTest, RecordConflictTest001, TestSize.Level0)
681 {
682 /**
683 * @tc.steps:step1. init data and sync
684 * @tc.expected: step1. return ok.
685 */
686 int count = 10;
687 InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
688 g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
689 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
690 int callCount = 0;
__anoncb3c72070902() 691 g_cloudStoreHook->SetSyncFinishHook([&callCount]() {
692 callCount++;
693 g_processCondition.notify_all();
694 });
695 CallSync(option);
696 {
697 std::unique_lock<std::mutex> lock(g_processMutex);
698 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anoncb3c72070a02() 699 [&callCount]() { return callCount == 2; }); // 2 is compensated sync
700 ASSERT_EQ(result, true);
701 }
702
703 /**
704 * @tc.steps:step2. sync again and check asset
705 * @tc.expected: step2. return ok.
706 */
707 g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
__anoncb3c72070b02(std::map<std::string, Assets> &assets) 708 g_virtualAssetLoader->ForkDownload([](std::map<std::string, Assets> &assets) {
709 EXPECT_EQ(assets.find(COL_ASSET) != assets.end(), true);
710 });
711 CallSync(option);
712 {
713 std::unique_lock<std::mutex> lock(g_processMutex);
714 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anoncb3c72070c02() 715 [&callCount]() { return callCount == 4; }); // 4 is compensated sync
716 ASSERT_EQ(result, true);
717 }
718 g_cloudStoreHook->SetSyncFinishHook(nullptr);
719 }
720
721 /**
722 * @tc.name: QueryCursorTest003
723 * @tc.desc: Test whether cursor fallback
724 * @tc.type: FUNC
725 * @tc.require:
726 * @tc.author: bty
727 */
728 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest003, TestSize.Level0)
729 {
730 /**
731 * @tc.steps:step1. init cloud data and sync
732 * @tc.expected: step1. return ok.
733 */
734 int cloudCount = 10;
735 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
736 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
737 CallSync(option);
738
739 /**
740 * @tc.steps:step2. delete cloud data and sync
741 * @tc.expected: step2. return ok.
742 */
743 DeleteCloudDBData(0, cloudCount, ASSETS_TABLE_NAME);
744 CallSync(option);
745
746 /**
747 * @tc.steps:step3. remove data
748 * @tc.expected: step3. return ok.
749 */
750 std::string device = "";
751 ASSERT_EQ(g_delegate->RemoveDeviceData(device, DistributedDB::FLAG_ONLY), DBStatus::OK);
752
753 /**
754 * @tc.steps:step4. insert local and check cursor
755 * @tc.expected: step4. return ok.
756 */
757 InsertLocalData(0, 1, ASSETS_TABLE_NAME, true);
758 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
759 " where cursor='31';";
760 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
761 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
762 }
763
764 /**
765 * @tc.name: QueryCursorTest004
766 * @tc.desc: Test temp trigger under concurrency
767 * @tc.type: FUNC
768 * @tc.require:
769 * @tc.author: bty
770 */
771 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest004, TestSize.Level0)
772 {
773 /**
774 * @tc.steps:step1. init cloud data
775 * @tc.expected: step1. return ok.
776 */
777 int cloudCount = 10;
778 InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
779 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
780
781 /**
782 * @tc.steps:step2. set tracker table before saving cloud data
783 * @tc.expected: step2. return ok.
784 */
__anoncb3c72070d02(const std::string &table, VBucket &) 785 g_virtualCloudDb->ForkQuery([](const std::string &table, VBucket &) {
786 TrackerSchema schema = {
787 .tableName = ASSETS_TABLE_NAME, .extendColName = COL_NAME, .trackerColNames = { COL_ID }
788 };
789 EXPECT_EQ(g_delegate->SetTrackerTable(schema), WITH_INVENTORY_DATA);
790 });
791 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
792 CallSync(option);
793
794 /**
795 * @tc.steps:step3. check extend_field and cursor
796 * @tc.expected: step3. return ok.
797 */
798 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
799 " where data_key='0' and extend_field='name10' and cursor='31';";
800 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
801 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
802 }
803 } // namespace
804 #endif // RELATIONAL_STORE