1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifdef RELATIONAL_STORE
17 #include <gtest/gtest.h>
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_db_types.h"
20 #include "cloud_db_sync_utils_test.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "log_print.h"
24 #include "relational_store_delegate.h"
25 #include "relational_store_instance.h"
26 #include "relational_store_manager.h"
27 #include "relational_sync_able_storage.h"
28 #include "runtime_config.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33
34 using namespace testing::ext;
35 using namespace DistributedDB;
36 using namespace DistributedDBUnitTest;
37
38 namespace {
39 constexpr const char *DB_SUFFIX = ".db";
40 constexpr const char *STORE_ID = "Relational_Store_ID";
41 constexpr const char *CREATE_TABLE_A_SQL =
42 "CREATE TABLE IF NOT EXISTS worker_a(" \
43 "id TEXT PRIMARY KEY," \
44 "name TEXT," \
45 "height REAL ," \
46 "photo BLOB," \
47 "age INT);";
48 constexpr const char *CREATE_TABLE_B_SQL =
49 "CREATE TABLE IF NOT EXISTS worker_b(" \
50 "id TEXT PRIMARY KEY," \
51 "name TEXT," \
52 "height REAL ," \
53 "photo BLOB," \
54 "age INT);";
55 constexpr const char *CREATE_TABLE_C_SQL =
56 "CREATE TABLE IF NOT EXISTS worker_c(" \
57 "id TEXT PRIMARY KEY," \
58 "name TEXT," \
59 "height REAL ," \
60 "photo BLOB," \
61 "age INT);";
62 constexpr const char *CREATE_TABLE_D_SQL =
63 "CREATE TABLE IF NOT EXISTS worker_d(" \
64 "id TEXT PRIMARY KEY," \
65 "name TEXT," \
66 "height REAL ," \
67 "photo BLOB," \
68 "age INT);";
69 const int64_t SYNC_WAIT_TIME = 60;
70
CreateUserDBAndTable(sqlite3 * & db)71 void CreateUserDBAndTable(sqlite3 *&db)
72 {
73 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
74 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_A_SQL), SQLITE_OK);
75 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_B_SQL), SQLITE_OK);
76 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_C_SQL), SQLITE_OK);
77 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_D_SQL), SQLITE_OK);
78 }
79
PrepareOption(CloudSyncOption & option,const Query & query,bool merge=false)80 void PrepareOption(CloudSyncOption &option, const Query &query, bool merge = false)
81 {
82 option.devices = { "CLOUD" };
83 option.mode = SYNC_MODE_CLOUD_MERGE;
84 option.query = query;
85 option.waitTime = SYNC_WAIT_TIME;
86 option.priorityTask = false;
87 option.compensatedSyncOnly = false;
88 option.merge = merge;
89 }
90
91 class DistributedDBCloudTaskMergeTest : public testing::Test {
92 public:
93 static void SetUpTestCase();
94 static void TearDownTestCase();
95 void SetUp() override;
96 void TearDown() override;
97 protected:
98 void InitTestDir();
99 DataBaseSchema GetSchema();
100 void CloseDb();
101 void InsertUserTableRecord(const std::string &tableName, int64_t recordCounts, int64_t begin = 0);
102 void CheckCloudTableCount(const std::vector<std::string> &tableName, int64_t expectCount);
103 void SetForkQueryForCloudMergeSyncTest001(std::atomic<int> &count);
104 std::string testDir_;
105 std::string storePath_;
106 sqlite3 *db_ = nullptr;
107 RelationalStoreDelegate *delegate_ = nullptr;
108 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
109 std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
110 std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
111 std::string tableNameA_ = "worker_a";
112 std::string tableNameB_ = "worker_b";
113 std::string tableNameC_ = "worker_c";
114 std::string tableNameD_ = "worker_d";
115 std::vector<std::string> tables_ = { tableNameA_, tableNameB_, tableNameC_, tableNameD_ };
116 };
117
SetUpTestCase()118 void DistributedDBCloudTaskMergeTest::SetUpTestCase()
119 {
120 }
121
TearDownTestCase()122 void DistributedDBCloudTaskMergeTest::TearDownTestCase()
123 {
124 }
125
SetUp()126 void DistributedDBCloudTaskMergeTest::SetUp()
127 {
128 DistributedDBToolsUnitTest::PrintTestCaseInfo();
129 InitTestDir();
130 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
131 LOGE("rm test db files error.");
132 }
133 DistributedDBToolsUnitTest::PrintTestCaseInfo();
134 LOGD("Test dir is %s", testDir_.c_str());
135 db_ = RelationalTestUtils::CreateDataBase(storePath_);
136 ASSERT_NE(db_, nullptr);
137 CreateUserDBAndTable(db_);
138 mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
139 RelationalStoreDelegate::Option option;
140 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
141 ASSERT_NE(delegate_, nullptr);
142 for (const auto &table : tables_) {
143 ASSERT_EQ(delegate_->CreateDistributedTable(table, CLOUD_COOPERATION), DBStatus::OK);
144 }
145 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
146 virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
147 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
148 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
149 DataBaseSchema dataBaseSchema = GetSchema();
150 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
151 }
152
TearDown()153 void DistributedDBCloudTaskMergeTest::TearDown()
154 {
155 virtualCloudDb_->ForkQuery(nullptr);
156 virtualCloudDb_->SetCloudError(false);
157 CloseDb();
158 EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
159 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
160 LOGE("rm test db files error.");
161 }
162 }
163
InitTestDir()164 void DistributedDBCloudTaskMergeTest::InitTestDir()
165 {
166 if (!testDir_.empty()) {
167 return;
168 }
169 DistributedDBToolsUnitTest::TestDirInit(testDir_);
170 storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
171 LOGI("The test db is:%s", testDir_.c_str());
172 }
173
GetSchema()174 DataBaseSchema DistributedDBCloudTaskMergeTest::GetSchema()
175 {
176 DataBaseSchema schema;
177 for (const auto &table : tables_) {
178 TableSchema tableSchema;
179 tableSchema.name = table;
180 tableSchema.fields = {
181 {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
182 {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
183 };
184 schema.tables.push_back(tableSchema);
185 }
186 return schema;
187 }
188
CloseDb()189 void DistributedDBCloudTaskMergeTest::CloseDb()
190 {
191 virtualCloudDb_ = nullptr;
192 if (mgr_ != nullptr) {
193 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
194 delegate_ = nullptr;
195 mgr_ = nullptr;
196 }
197 }
198
InsertUserTableRecord(const std::string & tableName,int64_t recordCounts,int64_t begin)199 void DistributedDBCloudTaskMergeTest::InsertUserTableRecord(const std::string &tableName,
200 int64_t recordCounts, int64_t begin)
201 {
202 ASSERT_NE(db_, nullptr);
203 for (int64_t i = begin; i < begin + recordCounts; ++i) {
204 string sql = "INSERT OR REPLACE INTO " + tableName +
205 " (id, name, height, photo, age) VALUES ('" + std::to_string(i) + "', 'Local" +
206 std::to_string(i) + "', '155.10', 'text', '21');";
207 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
208 }
209 }
210
CheckCloudTableCount(const std::vector<std::string> & tableNames,int64_t expectCount)211 void DistributedDBCloudTaskMergeTest::CheckCloudTableCount(const std::vector<std::string> &tableNames,
212 int64_t expectCount)
213 {
214 for (const auto &tableName : tableNames) {
215 VBucket extend;
216 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
217 int64_t realCount = 0;
218 std::vector<VBucket> data;
219 virtualCloudDb_->Query(tableName, extend, data);
220 for (size_t j = 0; j < data.size(); ++j) {
221 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
222 if (entry != data[j].end() && std::get<bool>(entry->second)) {
223 continue;
224 }
225 realCount++;
226 }
227 LOGI("check table %s", tableName.c_str());
228 EXPECT_EQ(realCount, expectCount); // ExpectCount represents the total amount of cloud data.
229 }
230 }
231
SetForkQueryForCloudMergeSyncTest001(std::atomic<int> & count)232 void DistributedDBCloudTaskMergeTest::SetForkQueryForCloudMergeSyncTest001(std::atomic<int> &count)
233 {
234 virtualCloudDb_->ForkQuery([&count](const std::string &, VBucket &) {
235 count++;
236 if (count == 1) { // taskid1
237 std::this_thread::sleep_for(std::chrono::seconds(1));
238 }
239 });
240 }
241
242 /**
243 * @tc.name: CloudSyncMergeTaskTest001
244 * @tc.desc: test merge sync task
245 * @tc.type: FUNC
246 * @tc.require:
247 * @tc.author: chenchaohao
248 */
249 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest001, TestSize.Level0)
250 {
251 /**
252 * @tc.steps:step1. insert user table record.
253 * @tc.expected: step1. ok.
254 */
255 const int actualCount = 10; // 10 is count of records
256 InsertUserTableRecord(tableNameA_, actualCount);
257 InsertUserTableRecord(tableNameB_, actualCount);
258 /**
259 * @tc.steps:step2. set callback to check during sync.
260 * @tc.expected: step2. ok.
261 */
262 std::atomic<int> count = 0;
263 SetForkQueryForCloudMergeSyncTest001(count);
264
265 Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
266 CloudSyncOption option;
267 PrepareOption(option, normalQuery1, false);
268 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
269
270 std::mutex callbackMutex;
271 std::condition_variable callbackCv;
272 size_t finishCount = 0u;
__anon19ad9a8e0302(const std::map<std::string, SyncProcess> &process) 273 auto callback1 = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
274 for (const auto &item: process) {
275 if (item.second.process == DistributedDB::FINISHED) {
276 {
277 std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
278 finishCount++;
279 }
280 LOGW("current finish %zu", finishCount);
281 callbackCv.notify_one();
282 }
283 }
284 };
285
286 Query normalQuery2 = Query::Select().FromTable({ tableNameB_ });
287 PrepareOption(option, normalQuery2, true);
288 ASSERT_EQ(delegate_->Sync(option, callback1), OK);
289
290 InsertUserTableRecord(tableNameC_, actualCount);
291 InsertUserTableRecord(tableNameD_, actualCount);
292
293 Query normalQuery3 = Query::Select().FromTable({ tableNameC_, tableNameD_ });
294 PrepareOption(option, normalQuery3, true);
295 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
296
297 Query normalQuery4 = Query::Select().FromTable({ tableNameB_, tableNameC_, tableNameD_ });
298 PrepareOption(option, normalQuery4, true);
299 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
300 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon19ad9a8e0402() 301 callbackCv.wait(callbackLock, [&finishCount]() {
302 return (finishCount >= 1u);
303 });
304 CheckCloudTableCount({ tableNameB_, tableNameC_, tableNameD_ }, actualCount);
305 }
306
307 /**
308 * @tc.name: CloudSyncMergeTaskTest002
309 * @tc.desc: test merge sync task with different mode.
310 * @tc.type: FUNC
311 * @tc.require:
312 * @tc.author: liaoyonghuang
313 */
314 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest002, TestSize.Level1)
315 {
316 /**
317 * @tc.steps:step1. insert user table record.
318 * @tc.expected: step1. ok.
319 */
320 const int actualCount = 10; // 10 is count of records
321 InsertUserTableRecord(tableNameA_, actualCount);
322 Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
323 CloudSyncOption option;
324 PrepareOption(option, normalQuery1, true);
325 /**
326 * @tc.steps:step2. set 2s block time for sync task 1st, and start sync task 2nd.
327 * @tc.expected: step2. ok.
328 */
329 virtualCloudDb_->SetBlockTime(2000); // block 1st sync task 2s.
__anon19ad9a8e0502() 330 std::thread syncThread1([&]() {
331 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
332 });
333 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
__anon19ad9a8e0602() 334 std::thread syncThread2([&]() {
335 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
336 });
337 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
338
339 /**
340 * @tc.steps:step3. start sync task 3rd.
341 * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
342 */
__anon19ad9a8e0702(const std::map<std::string, SyncProcess> &process) 343 auto callback3 = [](const std::map<std::string, SyncProcess> &process) {
344 for (const auto &item: process) {
345 ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
346 }
347 };
__anon19ad9a8e0802() 348 std::thread syncThread3([&]() {
349 ASSERT_EQ(delegate_->Sync(option, callback3), OK);
350 });
351 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
352
353 /**
354 * @tc.steps:step4. start sync task 4th.
355 * @tc.expected: task was not merged because the mode is not SYNC_MODE_CLOUD_MERGE.
356 */
__anon19ad9a8e0902(const std::map<std::string, SyncProcess> &process) 357 auto callback4 = [](const std::map<std::string, SyncProcess> &process) {
358 for (const auto &item: process) {
359 ASSERT_EQ(item.second.errCode, OK);
360 }
361 };
__anon19ad9a8e0a02() 362 std::thread syncThread4([&]() {
363 option.mode = SYNC_MODE_CLOUD_FORCE_PUSH;
364 ASSERT_EQ(delegate_->Sync(option, callback4), OK);
365 });
366 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
367
368 /**
369 * @tc.steps:step5. start sync task 5th.
370 * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
371 */
__anon19ad9a8e0b02(const std::map<std::string, SyncProcess> &process) 372 auto callback5 = [](const std::map<std::string, SyncProcess> &process) {
373 for (const auto &item: process) {
374 ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
375 }
376 };
__anon19ad9a8e0c02() 377 std::thread syncThread5([&]() {
378 option.mode = SYNC_MODE_CLOUD_MERGE;
379 ASSERT_EQ(delegate_->Sync(option, callback5), OK);
380 });
381
382 syncThread1.join();
383 syncThread2.join();
384 syncThread3.join();
385 syncThread4.join();
386 syncThread5.join();
387 }
388
389 /**
390 * @tc.name: CloudSyncMergeTaskTest003
391 * @tc.desc: test merge sync task which merge is false.
392 * @tc.type: FUNC
393 * @tc.require:
394 * @tc.author: liaoyonghuang
395 */
396 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest003, TestSize.Level1)
397 {
398 /**
399 * @tc.steps:step1. insert user table record.
400 * @tc.expected: step1. ok.
401 */
402 const int actualCount = 10; // 10 is count of records
403 InsertUserTableRecord(tableNameA_, actualCount);
404 Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
405 CloudSyncOption option;
406 PrepareOption(option, normalQuery1, true);
407 /**
408 * @tc.steps:step2. set 2s block time for sync task 1st, and start sync task 2nd.
409 * @tc.expected: step2. ok.
410 */
411 virtualCloudDb_->SetBlockTime(2000); // block 1st sync task 2s.
__anon19ad9a8e0d02() 412 std::thread syncThread1([&]() {
413 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
414 });
415 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
__anon19ad9a8e0e02() 416 std::thread syncThread2([&]() {
417 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
418 });
419 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
420 /**
421 * @tc.steps:step3. start sync task 3rd.
422 * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
423 */
__anon19ad9a8e0f02(const std::map<std::string, SyncProcess> &process) 424 auto callback3 = [](const std::map<std::string, SyncProcess> &process) {
425 for (const auto &item: process) {
426 ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
427 EXPECT_EQ(item.second.tableProcess.size(), 1u);
428 for (const auto &table : item.second.tableProcess) {
429 EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
430 }
431 }
432 };
__anon19ad9a8e1002() 433 std::thread syncThread3([&]() {
434 ASSERT_EQ(delegate_->Sync(option, callback3), OK);
435 });
436 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
437 /**
438 * @tc.steps:step4. start sync task 4th.
439 * @tc.expected: task OK because it cannot be merged.
440 */
__anon19ad9a8e1102(const std::map<std::string, SyncProcess> &process) 441 auto callback4 = [](const std::map<std::string, SyncProcess> &process) {
442 for (const auto &item: process) {
443 ASSERT_EQ(item.second.errCode, OK);
444 }
445 };
__anon19ad9a8e1202() 446 std::thread syncThread4([&]() {
447 option.merge = false;
448 ASSERT_EQ(delegate_->Sync(option, callback4), OK);
449 });
450
451 syncThread1.join();
452 syncThread2.join();
453 syncThread3.join();
454 syncThread4.join();
455 }
456 }
457 #endif