1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifdef RELATIONAL_STORE
17 #include <gtest/gtest.h>
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_db_types.h"
20 #include "cloud_db_sync_utils_test.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "log_print.h"
24 #include "relational_store_delegate.h"
25 #include "relational_store_instance.h"
26 #include "relational_store_manager.h"
27 #include "relational_sync_able_storage.h"
28 #include "runtime_config.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "virtual_communicator_aggregator.h"
34
35 using namespace testing::ext;
36 using namespace DistributedDB;
37 using namespace DistributedDBUnitTest;
38
39 namespace {
40 constexpr const char *DB_SUFFIX = ".db";
41 constexpr const char *STORE_ID = "Relational_Store_ID";
42 constexpr const char *CREATE_TABLE_A_SQL =
43 "CREATE TABLE IF NOT EXISTS worker_a(" \
44 "id TEXT PRIMARY KEY," \
45 "name TEXT," \
46 "height REAL ," \
47 "photo BLOB," \
48 "age INT);";
49 constexpr const char *CREATE_TABLE_B_SQL =
50 "CREATE TABLE IF NOT EXISTS worker_b(" \
51 "id TEXT PRIMARY KEY," \
52 "name TEXT," \
53 "height REAL ," \
54 "photo BLOB," \
55 "age INT);";
56 constexpr const char *CREATE_TABLE_C_SQL =
57 "CREATE TABLE IF NOT EXISTS worker_c(" \
58 "id TEXT PRIMARY KEY," \
59 "name TEXT," \
60 "height REAL ," \
61 "photo BLOB," \
62 "age INT);";
63 constexpr const char *CREATE_TABLE_D_SQL =
64 "CREATE TABLE IF NOT EXISTS worker_d(" \
65 "id TEXT PRIMARY KEY," \
66 "name TEXT," \
67 "height REAL ," \
68 "photo BLOB," \
69 "age INT);";
70 const int64_t SYNC_WAIT_TIME = 60;
71
CreateUserDBAndTable(sqlite3 * & db)72 void CreateUserDBAndTable(sqlite3 *&db)
73 {
74 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
75 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_A_SQL), SQLITE_OK);
76 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_B_SQL), SQLITE_OK);
77 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_C_SQL), SQLITE_OK);
78 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_D_SQL), SQLITE_OK);
79 }
80
PrepareOption(CloudSyncOption & option,const Query & query,bool merge=false)81 void PrepareOption(CloudSyncOption &option, const Query &query, bool merge = false)
82 {
83 option.devices = { "CLOUD" };
84 option.mode = SYNC_MODE_CLOUD_MERGE;
85 option.query = query;
86 option.waitTime = SYNC_WAIT_TIME;
87 option.priorityTask = false;
88 option.compensatedSyncOnly = false;
89 option.merge = merge;
90 }
91
92 class DistributedDBCloudTaskMergeTest : public testing::Test {
93 public:
94 static void SetUpTestCase();
95 static void TearDownTestCase();
96 void SetUp() override;
97 void TearDown() override;
98 protected:
99 void InitTestDir();
100 DataBaseSchema GetSchema();
101 void CloseDb();
102 void InsertUserTableRecord(const std::string &tableName, int64_t recordCounts, int64_t begin = 0);
103 void CheckCloudTableCount(const std::vector<std::string> &tableName, int64_t expectCount);
104 void SetForkQueryForCloudMergeSyncTest001(std::atomic<int> &count);
105 static SyncProcessCallback GetProcessCallback(const std::function<void(DBStatus)> &checkFinish,
106 std::mutex &callbackMutex, std::condition_variable &callbackCv, size_t &finishCount);
107 std::string testDir_;
108 std::string storePath_;
109 sqlite3 *db_ = nullptr;
110 RelationalStoreDelegate *delegate_ = nullptr;
111 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
112 std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
113 std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
114 std::string tableNameA_ = "worker_a";
115 std::string tableNameB_ = "worker_b";
116 std::string tableNameC_ = "worker_c";
117 std::string tableNameD_ = "worker_d";
118 std::vector<std::string> tables_ = { tableNameA_, tableNameB_, tableNameC_, tableNameD_ };
119 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
120 };
121
SetUpTestCase()122 void DistributedDBCloudTaskMergeTest::SetUpTestCase()
123 {
124 }
125
TearDownTestCase()126 void DistributedDBCloudTaskMergeTest::TearDownTestCase()
127 {
128 }
129
SetUp()130 void DistributedDBCloudTaskMergeTest::SetUp()
131 {
132 DistributedDBToolsUnitTest::PrintTestCaseInfo();
133 InitTestDir();
134 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
135 LOGE("rm test db files error.");
136 }
137 DistributedDBToolsUnitTest::PrintTestCaseInfo();
138 LOGD("Test dir is %s", testDir_.c_str());
139 db_ = RelationalTestUtils::CreateDataBase(storePath_);
140 ASSERT_NE(db_, nullptr);
141 CreateUserDBAndTable(db_);
142 mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
143 RelationalStoreDelegate::Option option;
144 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
145 ASSERT_NE(delegate_, nullptr);
146 for (const auto &table : tables_) {
147 ASSERT_EQ(delegate_->CreateDistributedTable(table, CLOUD_COOPERATION), DBStatus::OK);
148 }
149 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
150 virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
151 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
152 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
153 DataBaseSchema dataBaseSchema = GetSchema();
154 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
155 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
156 ASSERT_TRUE(communicatorAggregator_ != nullptr);
157 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
158 }
159
TearDown()160 void DistributedDBCloudTaskMergeTest::TearDown()
161 {
162 virtualCloudDb_->ForkQuery(nullptr);
163 virtualCloudDb_->SetCloudError(false);
164 CloseDb();
165 EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
166 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
167 LOGE("rm test db files error.");
168 }
169 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
170 communicatorAggregator_ = nullptr;
171 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
172 }
173
InitTestDir()174 void DistributedDBCloudTaskMergeTest::InitTestDir()
175 {
176 if (!testDir_.empty()) {
177 return;
178 }
179 DistributedDBToolsUnitTest::TestDirInit(testDir_);
180 storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
181 LOGI("The test db is:%s", testDir_.c_str());
182 }
183
GetSchema()184 DataBaseSchema DistributedDBCloudTaskMergeTest::GetSchema()
185 {
186 DataBaseSchema schema;
187 for (const auto &table : tables_) {
188 TableSchema tableSchema;
189 tableSchema.name = table;
190 tableSchema.fields = {
191 {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
192 {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
193 };
194 schema.tables.push_back(tableSchema);
195 }
196 return schema;
197 }
198
CloseDb()199 void DistributedDBCloudTaskMergeTest::CloseDb()
200 {
201 virtualCloudDb_ = nullptr;
202 if (mgr_ != nullptr) {
203 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
204 delegate_ = nullptr;
205 mgr_ = nullptr;
206 }
207 }
208
InsertUserTableRecord(const std::string & tableName,int64_t recordCounts,int64_t begin)209 void DistributedDBCloudTaskMergeTest::InsertUserTableRecord(const std::string &tableName,
210 int64_t recordCounts, int64_t begin)
211 {
212 ASSERT_NE(db_, nullptr);
213 for (int64_t i = begin; i < begin + recordCounts; ++i) {
214 string sql = "INSERT OR REPLACE INTO " + tableName +
215 " (id, name, height, photo, age) VALUES ('" + std::to_string(i) + "', 'Local" +
216 std::to_string(i) + "', '155.10', 'text', '21');";
217 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
218 }
219 }
220
CheckCloudTableCount(const std::vector<std::string> & tableNames,int64_t expectCount)221 void DistributedDBCloudTaskMergeTest::CheckCloudTableCount(const std::vector<std::string> &tableNames,
222 int64_t expectCount)
223 {
224 for (const auto &tableName : tableNames) {
225 VBucket extend;
226 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
227 int64_t realCount = 0;
228 std::vector<VBucket> data;
229 virtualCloudDb_->Query(tableName, extend, data);
230 for (size_t j = 0; j < data.size(); ++j) {
231 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
232 if (entry != data[j].end() && std::get<bool>(entry->second)) {
233 continue;
234 }
235 realCount++;
236 }
237 LOGI("check table %s", tableName.c_str());
238 EXPECT_EQ(realCount, expectCount); // ExpectCount represents the total amount of cloud data.
239 }
240 }
241
SetForkQueryForCloudMergeSyncTest001(std::atomic<int> & count)242 void DistributedDBCloudTaskMergeTest::SetForkQueryForCloudMergeSyncTest001(std::atomic<int> &count)
243 {
244 virtualCloudDb_->ForkQuery([&count](const std::string &, VBucket &) {
245 count++;
246 if (count == 1) { // taskid1
247 std::this_thread::sleep_for(std::chrono::seconds(1));
248 }
249 });
250 }
251
GetProcessCallback(const std::function<void (DBStatus)> & checkFinish,std::mutex & callbackMutex,std::condition_variable & callbackCv,size_t & finishCount)252 SyncProcessCallback DistributedDBCloudTaskMergeTest::GetProcessCallback(
253 const std::function<void(DBStatus)> &checkFinish, std::mutex &callbackMutex,
254 std::condition_variable &callbackCv, size_t &finishCount)
255 {
256 return [checkFinish, &callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
257 for (const auto &item: process) {
258 if (item.second.process == DistributedDB::FINISHED) {
259 if (checkFinish) {
260 checkFinish(item.second.errCode);
261 }
262 {
263 std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
264 finishCount++;
265 }
266 LOGW("current finish %zu", finishCount);
267 callbackCv.notify_one();
268 }
269 }
270 };
271 }
272
273 /**
274 * @tc.name: CloudSyncMergeTaskTest001
275 * @tc.desc: test merge sync task
276 * @tc.type: FUNC
277 * @tc.require:
278 * @tc.author: chenchaohao
279 */
280 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest001, TestSize.Level0)
281 {
282 /**
283 * @tc.steps:step1. insert user table record.
284 * @tc.expected: step1. ok.
285 */
286 const int actualCount = 10; // 10 is count of records
287 InsertUserTableRecord(tableNameA_, actualCount);
288 InsertUserTableRecord(tableNameB_, actualCount);
289 /**
290 * @tc.steps:step2. set callback to check during sync.
291 * @tc.expected: step2. ok.
292 */
293 std::atomic<int> count = 0;
294 SetForkQueryForCloudMergeSyncTest001(count);
295
296 Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
297 CloudSyncOption option;
298 PrepareOption(option, normalQuery1, false);
299 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
300
301 std::mutex callbackMutex;
302 std::condition_variable callbackCv;
303 size_t finishCount = 0u;
304 auto callback1 = GetProcessCallback(nullptr, callbackMutex, callbackCv, finishCount);
305
306 Query normalQuery2 = Query::Select().FromTable({ tableNameB_ });
307 PrepareOption(option, normalQuery2, true);
308 ASSERT_EQ(delegate_->Sync(option, callback1), OK);
309
310 InsertUserTableRecord(tableNameC_, actualCount);
311 InsertUserTableRecord(tableNameD_, actualCount);
312
313 Query normalQuery3 = Query::Select().FromTable({ tableNameC_, tableNameD_ });
314 PrepareOption(option, normalQuery3, true);
315 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
316
317 Query normalQuery4 = Query::Select().FromTable({ tableNameB_, tableNameC_, tableNameD_ });
318 PrepareOption(option, normalQuery4, true);
319 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
320 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon19125ece0402() 321 callbackCv.wait(callbackLock, [&finishCount]() {
322 return (finishCount >= 1u);
323 });
324 CheckCloudTableCount({ tableNameB_, tableNameC_, tableNameD_ }, actualCount);
325 }
326
327 /**
328 * @tc.name: CloudSyncMergeTaskTest002
329 * @tc.desc: test merge sync task with different mode.
330 * @tc.type: FUNC
331 * @tc.require:
332 * @tc.author: liaoyonghuang
333 */
334 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest002, TestSize.Level1)
335 {
336 /**
337 * @tc.steps:step1. insert user table record.
338 * @tc.expected: step1. ok.
339 */
340 const int actualCount = 10; // 10 is count of records
341 InsertUserTableRecord(tableNameA_, actualCount);
342 Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
343 CloudSyncOption option;
344 PrepareOption(option, normalQuery1, true);
345 /**
346 * @tc.steps:step2. set 2s block time for sync task 1st, and start sync task 2nd.
347 * @tc.expected: step2. ok.
348 */
349 virtualCloudDb_->SetBlockTime(2000); // block 1st sync task 2s.
__anon19125ece0502() 350 std::thread syncThread1([&]() {
351 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
352 });
353 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
__anon19125ece0602() 354 std::thread syncThread2([&]() {
355 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
356 });
357 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
358
359 /**
360 * @tc.steps:step3. start sync task 3rd.
361 * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
362 */
__anon19125ece0702(const std::map<std::string, SyncProcess> &process) 363 auto callback3 = [](const std::map<std::string, SyncProcess> &process) {
364 for (const auto &item: process) {
365 ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
366 }
367 };
__anon19125ece0802() 368 std::thread syncThread3([&]() {
369 ASSERT_EQ(delegate_->Sync(option, callback3), OK);
370 });
371 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
372
373 /**
374 * @tc.steps:step4. start sync task 4th.
375 * @tc.expected: task was not merged because the mode is not SYNC_MODE_CLOUD_MERGE.
376 */
__anon19125ece0902(const std::map<std::string, SyncProcess> &process) 377 auto callback4 = [](const std::map<std::string, SyncProcess> &process) {
378 for (const auto &item: process) {
379 ASSERT_EQ(item.second.errCode, OK);
380 }
381 };
__anon19125ece0a02() 382 std::thread syncThread4([&]() {
383 option.mode = SYNC_MODE_CLOUD_FORCE_PUSH;
384 ASSERT_EQ(delegate_->Sync(option, callback4), OK);
385 });
386 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
387
388 /**
389 * @tc.steps:step5. start sync task 5th.
390 * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
391 */
__anon19125ece0b02(const std::map<std::string, SyncProcess> &process) 392 auto callback5 = [](const std::map<std::string, SyncProcess> &process) {
393 for (const auto &item: process) {
394 ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
395 }
396 };
__anon19125ece0c02() 397 std::thread syncThread5([&]() {
398 option.mode = SYNC_MODE_CLOUD_MERGE;
399 ASSERT_EQ(delegate_->Sync(option, callback5), OK);
400 });
401
402 syncThread1.join();
403 syncThread2.join();
404 syncThread3.join();
405 syncThread4.join();
406 syncThread5.join();
407 }
408
409 /**
410 * @tc.name: CloudSyncMergeTaskTest003
411 * @tc.desc: test merge sync task which merge is false.
412 * @tc.type: FUNC
413 * @tc.require:
414 * @tc.author: liaoyonghuang
415 */
416 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest003, TestSize.Level1)
417 {
418 /**
419 * @tc.steps:step1. insert user table record.
420 * @tc.expected: step1. ok.
421 */
422 const int actualCount = 10; // 10 is count of records
423 InsertUserTableRecord(tableNameA_, actualCount);
424 Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
425 CloudSyncOption option;
426 PrepareOption(option, normalQuery1, true);
427 /**
428 * @tc.steps:step2. set 2s block time for sync task 1st, and start sync task 2nd.
429 * @tc.expected: step2. ok.
430 */
431 virtualCloudDb_->SetBlockTime(2000); // block 1st sync task 2s.
__anon19125ece0d02() 432 std::thread syncThread1([&]() {
433 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
434 });
435 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
__anon19125ece0e02() 436 std::thread syncThread2([&]() {
437 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
438 });
439 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
440 /**
441 * @tc.steps:step3. start sync task 3rd.
442 * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
443 */
__anon19125ece0f02(const std::map<std::string, SyncProcess> &process) 444 auto callback3 = [](const std::map<std::string, SyncProcess> &process) {
445 for (const auto &item: process) {
446 ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
447 EXPECT_EQ(item.second.tableProcess.size(), 1u);
448 for (const auto &table : item.second.tableProcess) {
449 EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
450 }
451 }
452 };
__anon19125ece1002() 453 std::thread syncThread3([&]() {
454 ASSERT_EQ(delegate_->Sync(option, callback3), OK);
455 });
456 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
457 /**
458 * @tc.steps:step4. start sync task 4th.
459 * @tc.expected: task OK because it cannot be merged.
460 */
__anon19125ece1102(const std::map<std::string, SyncProcess> &process) 461 auto callback4 = [](const std::map<std::string, SyncProcess> &process) {
462 for (const auto &item: process) {
463 ASSERT_EQ(item.second.errCode, OK);
464 }
465 };
__anon19125ece1202() 466 std::thread syncThread4([&]() {
467 option.merge = false;
468 ASSERT_EQ(delegate_->Sync(option, callback4), OK);
469 });
470
471 syncThread1.join();
472 syncThread2.join();
473 syncThread3.join();
474 syncThread4.join();
475 }
476
477 /**
478 * @tc.name: CloudSyncMergeTaskTest004
479 * @tc.desc: test merge sync task with async download asset
480 * @tc.type: FUNC
481 * @tc.require:
482 * @tc.author: zqq
483 */
484 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest004, TestSize.Level0)
485 {
486 size_t finishCount = 0u;
487 std::mutex callbackMutex;
488 std::condition_variable callbackCv;
__anon19125ece1302(DBStatus status) 489 auto callback = GetProcessCallback([](DBStatus status) {
490 EXPECT_EQ(status, OK);
491 }, callbackMutex, callbackCv, finishCount);
492
493 Query normalQuery = Query::Select().FromTable({ tableNameA_ });
494 CloudSyncOption option;
495 PrepareOption(option, normalQuery, true);
496 ASSERT_EQ(delegate_->Sync(option, callback), OK);
497 option.asyncDownloadAssets = true;
498 ASSERT_EQ(delegate_->Sync(option, callback), OK);
499
500 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon19125ece1402() 501 callbackCv.wait(callbackLock, [&finishCount]() {
502 return (finishCount >= 2u); // download 2 times
503 });
504 }
505 }
506 #endif