• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifdef RELATIONAL_STORE
17 #include <gtest/gtest.h>
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_db_types.h"
20 #include "cloud_db_sync_utils_test.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "log_print.h"
24 #include "relational_store_delegate.h"
25 #include "relational_store_instance.h"
26 #include "relational_store_manager.h"
27 #include "relational_sync_able_storage.h"
28 #include "runtime_config.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 
34 using namespace testing::ext;
35 using namespace DistributedDB;
36 using namespace DistributedDBUnitTest;
37 
38 namespace {
39 constexpr const char *DB_SUFFIX = ".db";
40 constexpr const char *STORE_ID = "Relational_Store_ID";
41 constexpr const char *CREATE_TABLE_A_SQL =
42     "CREATE TABLE IF NOT EXISTS worker_a(" \
43     "id TEXT PRIMARY KEY," \
44     "name TEXT," \
45     "height REAL ," \
46     "photo BLOB," \
47     "age INT);";
48 constexpr const char *CREATE_TABLE_B_SQL =
49     "CREATE TABLE IF NOT EXISTS worker_b(" \
50     "id TEXT PRIMARY KEY," \
51     "name TEXT," \
52     "height REAL ," \
53     "photo BLOB," \
54     "age INT);";
55 constexpr const char *CREATE_TABLE_C_SQL =
56     "CREATE TABLE IF NOT EXISTS worker_c(" \
57     "id TEXT PRIMARY KEY," \
58     "name TEXT," \
59     "height REAL ," \
60     "photo BLOB," \
61     "age INT);";
62 constexpr const char *CREATE_TABLE_D_SQL =
63     "CREATE TABLE IF NOT EXISTS worker_d(" \
64     "id TEXT PRIMARY KEY," \
65     "name TEXT," \
66     "height REAL ," \
67     "photo BLOB," \
68     "age INT);";
69 const int64_t SYNC_WAIT_TIME = 60;
70 
CreateUserDBAndTable(sqlite3 * & db)71 void CreateUserDBAndTable(sqlite3 *&db)
72 {
73     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
74     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_A_SQL), SQLITE_OK);
75     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_B_SQL), SQLITE_OK);
76     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_C_SQL), SQLITE_OK);
77     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_D_SQL), SQLITE_OK);
78 }
79 
PrepareOption(CloudSyncOption & option,const Query & query,bool merge=false)80 void PrepareOption(CloudSyncOption &option, const Query &query, bool merge = false)
81 {
82     option.devices = { "CLOUD" };
83     option.mode = SYNC_MODE_CLOUD_MERGE;
84     option.query = query;
85     option.waitTime = SYNC_WAIT_TIME;
86     option.priorityTask = false;
87     option.compensatedSyncOnly = false;
88     option.merge = merge;
89 }
90 
91 class DistributedDBCloudTaskMergeTest : public testing::Test {
92 public:
93     static void SetUpTestCase();
94     static void TearDownTestCase();
95     void SetUp() override;
96     void TearDown() override;
97 protected:
98     void InitTestDir();
99     DataBaseSchema GetSchema();
100     void CloseDb();
101     void InsertUserTableRecord(const std::string &tableName, int64_t recordCounts, int64_t begin = 0);
102     void CheckCloudTableCount(const std::vector<std::string> &tableName, int64_t expectCount);
103     void SetForkQueryForCloudMergeSyncTest001(std::atomic<int> &count);
104     std::string testDir_;
105     std::string storePath_;
106     sqlite3 *db_ = nullptr;
107     RelationalStoreDelegate *delegate_ = nullptr;
108     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
109     std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
110     std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
111     std::string tableNameA_ = "worker_a";
112     std::string tableNameB_ = "worker_b";
113     std::string tableNameC_ = "worker_c";
114     std::string tableNameD_ = "worker_d";
115     std::vector<std::string> tables_ = { tableNameA_, tableNameB_, tableNameC_, tableNameD_ };
116 };
117 
SetUpTestCase()118 void DistributedDBCloudTaskMergeTest::SetUpTestCase()
119 {
120 }
121 
TearDownTestCase()122 void DistributedDBCloudTaskMergeTest::TearDownTestCase()
123 {
124 }
125 
SetUp()126 void DistributedDBCloudTaskMergeTest::SetUp()
127 {
128     DistributedDBToolsUnitTest::PrintTestCaseInfo();
129     InitTestDir();
130     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
131         LOGE("rm test db files error.");
132     }
133     DistributedDBToolsUnitTest::PrintTestCaseInfo();
134     LOGD("Test dir is %s", testDir_.c_str());
135     db_ = RelationalTestUtils::CreateDataBase(storePath_);
136     ASSERT_NE(db_, nullptr);
137     CreateUserDBAndTable(db_);
138     mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
139     RelationalStoreDelegate::Option option;
140     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
141     ASSERT_NE(delegate_, nullptr);
142     for (const auto &table : tables_) {
143         ASSERT_EQ(delegate_->CreateDistributedTable(table, CLOUD_COOPERATION), DBStatus::OK);
144     }
145     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
146     virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
147     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
148     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
149     DataBaseSchema dataBaseSchema = GetSchema();
150     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
151 }
152 
TearDown()153 void DistributedDBCloudTaskMergeTest::TearDown()
154 {
155     virtualCloudDb_->ForkQuery(nullptr);
156     virtualCloudDb_->SetCloudError(false);
157     CloseDb();
158     EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
159     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
160         LOGE("rm test db files error.");
161     }
162 }
163 
InitTestDir()164 void DistributedDBCloudTaskMergeTest::InitTestDir()
165 {
166     if (!testDir_.empty()) {
167         return;
168     }
169     DistributedDBToolsUnitTest::TestDirInit(testDir_);
170     storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
171     LOGI("The test db is:%s", testDir_.c_str());
172 }
173 
GetSchema()174 DataBaseSchema DistributedDBCloudTaskMergeTest::GetSchema()
175 {
176     DataBaseSchema schema;
177     for (const auto &table : tables_) {
178         TableSchema tableSchema;
179         tableSchema.name = table;
180         tableSchema.fields = {
181             {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
182             {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
183         };
184         schema.tables.push_back(tableSchema);
185     }
186     return schema;
187 }
188 
CloseDb()189 void DistributedDBCloudTaskMergeTest::CloseDb()
190 {
191     virtualCloudDb_ = nullptr;
192     if (mgr_ != nullptr) {
193         EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
194         delegate_ = nullptr;
195         mgr_ = nullptr;
196     }
197 }
198 
InsertUserTableRecord(const std::string & tableName,int64_t recordCounts,int64_t begin)199 void DistributedDBCloudTaskMergeTest::InsertUserTableRecord(const std::string &tableName,
200     int64_t recordCounts, int64_t begin)
201 {
202     ASSERT_NE(db_, nullptr);
203     for (int64_t i = begin; i < begin + recordCounts; ++i) {
204         string sql = "INSERT OR REPLACE INTO " + tableName +
205             " (id, name, height, photo, age) VALUES ('" + std::to_string(i) + "', 'Local" +
206             std::to_string(i) + "', '155.10',  'text', '21');";
207         ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
208     }
209 }
210 
CheckCloudTableCount(const std::vector<std::string> & tableNames,int64_t expectCount)211 void DistributedDBCloudTaskMergeTest::CheckCloudTableCount(const std::vector<std::string> &tableNames,
212     int64_t expectCount)
213 {
214     for (const auto &tableName : tableNames) {
215         VBucket extend;
216         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
217         int64_t realCount = 0;
218         std::vector<VBucket> data;
219         virtualCloudDb_->Query(tableName, extend, data);
220         for (size_t j = 0; j < data.size(); ++j) {
221             auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
222             if (entry != data[j].end() && std::get<bool>(entry->second)) {
223                 continue;
224             }
225             realCount++;
226         }
227         LOGI("check table %s", tableName.c_str());
228         EXPECT_EQ(realCount, expectCount); // ExpectCount represents the total amount of cloud data.
229     }
230 }
231 
SetForkQueryForCloudMergeSyncTest001(std::atomic<int> & count)232 void DistributedDBCloudTaskMergeTest::SetForkQueryForCloudMergeSyncTest001(std::atomic<int> &count)
233 {
234     virtualCloudDb_->ForkQuery([&count](const std::string &, VBucket &) {
235         count++;
236         if (count == 1) { // taskid1
237             std::this_thread::sleep_for(std::chrono::seconds(1));
238         }
239     });
240 }
241 
242 /**
243  * @tc.name: CloudSyncMergeTaskTest001
244  * @tc.desc: test merge sync task
245  * @tc.type: FUNC
246  * @tc.require:
247  * @tc.author: chenchaohao
248  */
249 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest001, TestSize.Level0)
250 {
251     /**
252      * @tc.steps:step1. insert user table record.
253      * @tc.expected: step1. ok.
254      */
255     const int actualCount = 10; // 10 is count of records
256     InsertUserTableRecord(tableNameA_, actualCount);
257     InsertUserTableRecord(tableNameB_, actualCount);
258     /**
259      * @tc.steps:step2. set callback to check during sync.
260      * @tc.expected: step2. ok.
261      */
262     std::atomic<int> count = 0;
263     SetForkQueryForCloudMergeSyncTest001(count);
264 
265     Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
266     CloudSyncOption option;
267     PrepareOption(option, normalQuery1, false);
268     ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
269 
270     std::mutex callbackMutex;
271     std::condition_variable callbackCv;
272     size_t finishCount = 0u;
__anon19ad9a8e0302(const std::map<std::string, SyncProcess> &process) 273     auto callback1 = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
274         for (const auto &item: process) {
275             if (item.second.process == DistributedDB::FINISHED) {
276                 {
277                     std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
278                     finishCount++;
279                 }
280                 LOGW("current finish %zu", finishCount);
281                 callbackCv.notify_one();
282             }
283         }
284     };
285 
286     Query normalQuery2 = Query::Select().FromTable({ tableNameB_ });
287     PrepareOption(option, normalQuery2, true);
288     ASSERT_EQ(delegate_->Sync(option, callback1), OK);
289 
290     InsertUserTableRecord(tableNameC_, actualCount);
291     InsertUserTableRecord(tableNameD_, actualCount);
292 
293     Query normalQuery3 = Query::Select().FromTable({ tableNameC_, tableNameD_ });
294     PrepareOption(option, normalQuery3, true);
295     ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
296 
297     Query normalQuery4 = Query::Select().FromTable({ tableNameB_, tableNameC_, tableNameD_ });
298     PrepareOption(option, normalQuery4, true);
299     ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
300     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon19ad9a8e0402() 301     callbackCv.wait(callbackLock, [&finishCount]() {
302         return (finishCount >= 1u);
303     });
304     CheckCloudTableCount({ tableNameB_, tableNameC_, tableNameD_ }, actualCount);
305 }
306 
307 /**
308  * @tc.name: CloudSyncMergeTaskTest002
309  * @tc.desc: test merge sync task with different mode.
310  * @tc.type: FUNC
311  * @tc.require:
312  * @tc.author: liaoyonghuang
313  */
314 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest002, TestSize.Level1)
315 {
316     /**
317      * @tc.steps:step1. insert user table record.
318      * @tc.expected: step1. ok.
319      */
320     const int actualCount = 10; // 10 is count of records
321     InsertUserTableRecord(tableNameA_, actualCount);
322     Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
323     CloudSyncOption option;
324     PrepareOption(option, normalQuery1, true);
325     /**
326      * @tc.steps:step2. set 2s block time for sync task 1st, and start sync task 2nd.
327      * @tc.expected: step2. ok.
328      */
329     virtualCloudDb_->SetBlockTime(2000); // block 1st sync task 2s.
__anon19ad9a8e0502() 330     std::thread syncThread1([&]() {
331         ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
332     });
333     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
__anon19ad9a8e0602() 334     std::thread syncThread2([&]() {
335         ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
336     });
337     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
338 
339     /**
340      * @tc.steps:step3. start sync task 3rd.
341      * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
342      */
__anon19ad9a8e0702(const std::map<std::string, SyncProcess> &process) 343     auto callback3 = [](const std::map<std::string, SyncProcess> &process) {
344         for (const auto &item: process) {
345             ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
346         }
347     };
__anon19ad9a8e0802() 348     std::thread syncThread3([&]() {
349         ASSERT_EQ(delegate_->Sync(option, callback3), OK);
350     });
351     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
352 
353     /**
354      * @tc.steps:step4. start sync task 4th.
355      * @tc.expected: task was not merged because the mode is not SYNC_MODE_CLOUD_MERGE.
356      */
__anon19ad9a8e0902(const std::map<std::string, SyncProcess> &process) 357     auto callback4 = [](const std::map<std::string, SyncProcess> &process) {
358         for (const auto &item: process) {
359             ASSERT_EQ(item.second.errCode, OK);
360         }
361     };
__anon19ad9a8e0a02() 362     std::thread syncThread4([&]() {
363         option.mode = SYNC_MODE_CLOUD_FORCE_PUSH;
364         ASSERT_EQ(delegate_->Sync(option, callback4), OK);
365     });
366     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
367 
368     /**
369      * @tc.steps:step5. start sync task 5th.
370      * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
371      */
__anon19ad9a8e0b02(const std::map<std::string, SyncProcess> &process) 372     auto callback5 = [](const std::map<std::string, SyncProcess> &process) {
373         for (const auto &item: process) {
374             ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
375         }
376     };
__anon19ad9a8e0c02() 377     std::thread syncThread5([&]() {
378         option.mode = SYNC_MODE_CLOUD_MERGE;
379         ASSERT_EQ(delegate_->Sync(option, callback5), OK);
380     });
381 
382     syncThread1.join();
383     syncThread2.join();
384     syncThread3.join();
385     syncThread4.join();
386     syncThread5.join();
387 }
388 
389 /**
390  * @tc.name: CloudSyncMergeTaskTest003
391  * @tc.desc: test merge sync task which merge is false.
392  * @tc.type: FUNC
393  * @tc.require:
394  * @tc.author: liaoyonghuang
395  */
396 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest003, TestSize.Level1)
397 {
398     /**
399      * @tc.steps:step1. insert user table record.
400      * @tc.expected: step1. ok.
401      */
402     const int actualCount = 10; // 10 is count of records
403     InsertUserTableRecord(tableNameA_, actualCount);
404     Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
405     CloudSyncOption option;
406     PrepareOption(option, normalQuery1, true);
407     /**
408      * @tc.steps:step2. set 2s block time for sync task 1st, and start sync task 2nd.
409      * @tc.expected: step2. ok.
410      */
411     virtualCloudDb_->SetBlockTime(2000); // block 1st sync task 2s.
__anon19ad9a8e0d02() 412     std::thread syncThread1([&]() {
413         ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
414     });
415     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
__anon19ad9a8e0e02() 416     std::thread syncThread2([&]() {
417         ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
418     });
419     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
420     /**
421      * @tc.steps:step3. start sync task 3rd.
422      * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
423      */
__anon19ad9a8e0f02(const std::map<std::string, SyncProcess> &process) 424     auto callback3 = [](const std::map<std::string, SyncProcess> &process) {
425         for (const auto &item: process) {
426             ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
427             EXPECT_EQ(item.second.tableProcess.size(), 1u);
428             for (const auto &table : item.second.tableProcess) {
429                 EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
430             }
431         }
432     };
__anon19ad9a8e1002() 433     std::thread syncThread3([&]() {
434         ASSERT_EQ(delegate_->Sync(option, callback3), OK);
435     });
436     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
437     /**
438      * @tc.steps:step4. start sync task 4th.
439      * @tc.expected: task OK because it cannot be merged.
440      */
__anon19ad9a8e1102(const std::map<std::string, SyncProcess> &process) 441     auto callback4 = [](const std::map<std::string, SyncProcess> &process) {
442         for (const auto &item: process) {
443             ASSERT_EQ(item.second.errCode, OK);
444         }
445     };
__anon19ad9a8e1202() 446     std::thread syncThread4([&]() {
447         option.merge = false;
448         ASSERT_EQ(delegate_->Sync(option, callback4), OK);
449     });
450 
451     syncThread1.join();
452     syncThread2.join();
453     syncThread3.join();
454     syncThread4.join();
455 }
456 }
457 #endif