• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifdef RELATIONAL_STORE
17 #include <gtest/gtest.h>
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_db_types.h"
20 #include "cloud_db_sync_utils_test.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "log_print.h"
24 #include "relational_store_delegate.h"
25 #include "relational_store_instance.h"
26 #include "relational_store_manager.h"
27 #include "relational_sync_able_storage.h"
28 #include "runtime_config.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "virtual_communicator_aggregator.h"
34 
35 using namespace testing::ext;
36 using namespace DistributedDB;
37 using namespace DistributedDBUnitTest;
38 
39 namespace {
40 constexpr const char *DB_SUFFIX = ".db";
41 constexpr const char *STORE_ID = "Relational_Store_ID";
42 constexpr const char *CREATE_TABLE_A_SQL =
43     "CREATE TABLE IF NOT EXISTS worker_a(" \
44     "id TEXT PRIMARY KEY," \
45     "name TEXT," \
46     "height REAL ," \
47     "photo BLOB," \
48     "age INT);";
49 constexpr const char *CREATE_TABLE_B_SQL =
50     "CREATE TABLE IF NOT EXISTS worker_b(" \
51     "id TEXT PRIMARY KEY," \
52     "name TEXT," \
53     "height REAL ," \
54     "photo BLOB," \
55     "age INT);";
56 constexpr const char *CREATE_TABLE_C_SQL =
57     "CREATE TABLE IF NOT EXISTS worker_c(" \
58     "id TEXT PRIMARY KEY," \
59     "name TEXT," \
60     "height REAL ," \
61     "photo BLOB," \
62     "age INT);";
63 constexpr const char *CREATE_TABLE_D_SQL =
64     "CREATE TABLE IF NOT EXISTS worker_d(" \
65     "id TEXT PRIMARY KEY," \
66     "name TEXT," \
67     "height REAL ," \
68     "photo BLOB," \
69     "age INT);";
70 const int64_t SYNC_WAIT_TIME = 60;
71 
CreateUserDBAndTable(sqlite3 * & db)72 void CreateUserDBAndTable(sqlite3 *&db)
73 {
74     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
75     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_A_SQL), SQLITE_OK);
76     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_B_SQL), SQLITE_OK);
77     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_C_SQL), SQLITE_OK);
78     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_D_SQL), SQLITE_OK);
79 }
80 
PrepareOption(CloudSyncOption & option,const Query & query,bool merge=false)81 void PrepareOption(CloudSyncOption &option, const Query &query, bool merge = false)
82 {
83     option.devices = { "CLOUD" };
84     option.mode = SYNC_MODE_CLOUD_MERGE;
85     option.query = query;
86     option.waitTime = SYNC_WAIT_TIME;
87     option.priorityTask = false;
88     option.compensatedSyncOnly = false;
89     option.merge = merge;
90 }
91 
92 class DistributedDBCloudTaskMergeTest : public testing::Test {
93 public:
94     static void SetUpTestCase();
95     static void TearDownTestCase();
96     void SetUp() override;
97     void TearDown() override;
98 protected:
99     void InitTestDir();
100     DataBaseSchema GetSchema();
101     void CloseDb();
102     void InsertUserTableRecord(const std::string &tableName, int64_t recordCounts, int64_t begin = 0);
103     void CheckCloudTableCount(const std::vector<std::string> &tableName, int64_t expectCount);
104     void SetForkQueryForCloudMergeSyncTest001(std::atomic<int> &count);
105     static SyncProcessCallback GetProcessCallback(const std::function<void(DBStatus)> &checkFinish,
106         std::mutex &callbackMutex, std::condition_variable &callbackCv, size_t &finishCount);
107     std::string testDir_;
108     std::string storePath_;
109     sqlite3 *db_ = nullptr;
110     RelationalStoreDelegate *delegate_ = nullptr;
111     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
112     std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
113     std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
114     std::string tableNameA_ = "worker_a";
115     std::string tableNameB_ = "worker_b";
116     std::string tableNameC_ = "worker_c";
117     std::string tableNameD_ = "worker_d";
118     std::vector<std::string> tables_ = { tableNameA_, tableNameB_, tableNameC_, tableNameD_ };
119     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
120 };
121 
SetUpTestCase()122 void DistributedDBCloudTaskMergeTest::SetUpTestCase()
123 {
124 }
125 
TearDownTestCase()126 void DistributedDBCloudTaskMergeTest::TearDownTestCase()
127 {
128 }
129 
SetUp()130 void DistributedDBCloudTaskMergeTest::SetUp()
131 {
132     DistributedDBToolsUnitTest::PrintTestCaseInfo();
133     InitTestDir();
134     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
135         LOGE("rm test db files error.");
136     }
137     DistributedDBToolsUnitTest::PrintTestCaseInfo();
138     LOGD("Test dir is %s", testDir_.c_str());
139     db_ = RelationalTestUtils::CreateDataBase(storePath_);
140     ASSERT_NE(db_, nullptr);
141     CreateUserDBAndTable(db_);
142     mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
143     RelationalStoreDelegate::Option option;
144     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
145     ASSERT_NE(delegate_, nullptr);
146     for (const auto &table : tables_) {
147         ASSERT_EQ(delegate_->CreateDistributedTable(table, CLOUD_COOPERATION), DBStatus::OK);
148     }
149     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
150     virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
151     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
152     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
153     DataBaseSchema dataBaseSchema = GetSchema();
154     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
155     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
156     ASSERT_TRUE(communicatorAggregator_ != nullptr);
157     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
158 }
159 
TearDown()160 void DistributedDBCloudTaskMergeTest::TearDown()
161 {
162     virtualCloudDb_->ForkQuery(nullptr);
163     virtualCloudDb_->SetCloudError(false);
164     CloseDb();
165     EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
166     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
167         LOGE("rm test db files error.");
168     }
169     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
170     communicatorAggregator_ = nullptr;
171     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
172 }
173 
InitTestDir()174 void DistributedDBCloudTaskMergeTest::InitTestDir()
175 {
176     if (!testDir_.empty()) {
177         return;
178     }
179     DistributedDBToolsUnitTest::TestDirInit(testDir_);
180     storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
181     LOGI("The test db is:%s", testDir_.c_str());
182 }
183 
GetSchema()184 DataBaseSchema DistributedDBCloudTaskMergeTest::GetSchema()
185 {
186     DataBaseSchema schema;
187     for (const auto &table : tables_) {
188         TableSchema tableSchema;
189         tableSchema.name = table;
190         tableSchema.fields = {
191             {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
192             {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
193         };
194         schema.tables.push_back(tableSchema);
195     }
196     return schema;
197 }
198 
CloseDb()199 void DistributedDBCloudTaskMergeTest::CloseDb()
200 {
201     virtualCloudDb_ = nullptr;
202     if (mgr_ != nullptr) {
203         EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
204         delegate_ = nullptr;
205         mgr_ = nullptr;
206     }
207 }
208 
InsertUserTableRecord(const std::string & tableName,int64_t recordCounts,int64_t begin)209 void DistributedDBCloudTaskMergeTest::InsertUserTableRecord(const std::string &tableName,
210     int64_t recordCounts, int64_t begin)
211 {
212     ASSERT_NE(db_, nullptr);
213     for (int64_t i = begin; i < begin + recordCounts; ++i) {
214         string sql = "INSERT OR REPLACE INTO " + tableName +
215             " (id, name, height, photo, age) VALUES ('" + std::to_string(i) + "', 'Local" +
216             std::to_string(i) + "', '155.10',  'text', '21');";
217         ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
218     }
219 }
220 
CheckCloudTableCount(const std::vector<std::string> & tableNames,int64_t expectCount)221 void DistributedDBCloudTaskMergeTest::CheckCloudTableCount(const std::vector<std::string> &tableNames,
222     int64_t expectCount)
223 {
224     for (const auto &tableName : tableNames) {
225         VBucket extend;
226         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
227         int64_t realCount = 0;
228         std::vector<VBucket> data;
229         virtualCloudDb_->Query(tableName, extend, data);
230         for (size_t j = 0; j < data.size(); ++j) {
231             auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
232             if (entry != data[j].end() && std::get<bool>(entry->second)) {
233                 continue;
234             }
235             realCount++;
236         }
237         LOGI("check table %s", tableName.c_str());
238         EXPECT_EQ(realCount, expectCount); // ExpectCount represents the total amount of cloud data.
239     }
240 }
241 
SetForkQueryForCloudMergeSyncTest001(std::atomic<int> & count)242 void DistributedDBCloudTaskMergeTest::SetForkQueryForCloudMergeSyncTest001(std::atomic<int> &count)
243 {
244     virtualCloudDb_->ForkQuery([&count](const std::string &, VBucket &) {
245         count++;
246         if (count == 1) { // taskid1
247             std::this_thread::sleep_for(std::chrono::seconds(1));
248         }
249     });
250 }
251 
GetProcessCallback(const std::function<void (DBStatus)> & checkFinish,std::mutex & callbackMutex,std::condition_variable & callbackCv,size_t & finishCount)252 SyncProcessCallback DistributedDBCloudTaskMergeTest::GetProcessCallback(
253     const std::function<void(DBStatus)> &checkFinish, std::mutex &callbackMutex,
254     std::condition_variable &callbackCv, size_t &finishCount)
255 {
256     return [checkFinish, &callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
257         for (const auto &item: process) {
258             if (item.second.process == DistributedDB::FINISHED) {
259                 if (checkFinish) {
260                     checkFinish(item.second.errCode);
261                 }
262                 {
263                     std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
264                     finishCount++;
265                 }
266                 LOGW("current finish %zu", finishCount);
267                 callbackCv.notify_one();
268             }
269         }
270     };
271 }
272 
273 /**
274  * @tc.name: CloudSyncMergeTaskTest001
275  * @tc.desc: test merge sync task
276  * @tc.type: FUNC
277  * @tc.require:
278  * @tc.author: chenchaohao
279  */
280 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest001, TestSize.Level1)
281 {
282     /**
283      * @tc.steps:step1. insert user table record.
284      * @tc.expected: step1. ok.
285      */
286     const int actualCount = 10; // 10 is count of records
287     InsertUserTableRecord(tableNameA_, actualCount);
288     InsertUserTableRecord(tableNameB_, actualCount);
289     /**
290      * @tc.steps:step2. set callback to check during sync.
291      * @tc.expected: step2. ok.
292      */
293     std::atomic<int> count = 0;
294     SetForkQueryForCloudMergeSyncTest001(count);
295 
296     Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
297     CloudSyncOption option;
298     PrepareOption(option, normalQuery1, false);
299     ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
300 
301     std::mutex callbackMutex;
302     std::condition_variable callbackCv;
303     size_t finishCount = 0u;
304     auto callback1 = GetProcessCallback(nullptr, callbackMutex, callbackCv, finishCount);
305 
306     Query normalQuery2 = Query::Select().FromTable({ tableNameB_ });
307     PrepareOption(option, normalQuery2, true);
308     ASSERT_EQ(delegate_->Sync(option, callback1), OK);
309 
310     InsertUserTableRecord(tableNameC_, actualCount);
311     InsertUserTableRecord(tableNameD_, actualCount);
312 
313     Query normalQuery3 = Query::Select().FromTable({ tableNameC_, tableNameD_ });
314     PrepareOption(option, normalQuery3, true);
315     ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
316 
317     Query normalQuery4 = Query::Select().FromTable({ tableNameB_, tableNameC_, tableNameD_ });
318     PrepareOption(option, normalQuery4, true);
319     ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
320     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon5d84cf700402() 321     callbackCv.wait(callbackLock, [&finishCount]() {
322         return (finishCount >= 1u);
323     });
324     CheckCloudTableCount({ tableNameB_, tableNameC_, tableNameD_ }, actualCount);
325 }
326 
327 /**
328  * @tc.name: CloudSyncMergeTaskTest002
329  * @tc.desc: test merge sync task with different mode.
330  * @tc.type: FUNC
331  * @tc.require:
332  * @tc.author: liaoyonghuang
333  */
334 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest002, TestSize.Level1)
335 {
336     /**
337      * @tc.steps:step1. insert user table record.
338      * @tc.expected: step1. ok.
339      */
340     const int actualCount = 10; // 10 is count of records
341     InsertUserTableRecord(tableNameA_, actualCount);
342     Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
343     CloudSyncOption option;
344     PrepareOption(option, normalQuery1, true);
345     /**
346      * @tc.steps:step2. set 2s block time for sync task 1st, and start sync task 2nd.
347      * @tc.expected: step2. ok.
348      */
349     virtualCloudDb_->SetBlockTime(2000); // block 1st sync task 2s.
__anon5d84cf700502() 350     std::thread syncThread1([&]() {
351         ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
352     });
353     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
__anon5d84cf700602() 354     std::thread syncThread2([&]() {
355         ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
356     });
357     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
358 
359     /**
360      * @tc.steps:step3. start sync task 3rd.
361      * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
362      */
__anon5d84cf700702(const std::map<std::string, SyncProcess> &process) 363     auto callback3 = [](const std::map<std::string, SyncProcess> &process) {
364         for (const auto &item: process) {
365             ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
366         }
367     };
__anon5d84cf700802() 368     std::thread syncThread3([&]() {
369         ASSERT_EQ(delegate_->Sync(option, callback3), OK);
370     });
371     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
372 
373     /**
374      * @tc.steps:step4. start sync task 4th.
375      * @tc.expected: task was not merged because the mode is not SYNC_MODE_CLOUD_MERGE.
376      */
__anon5d84cf700902(const std::map<std::string, SyncProcess> &process) 377     auto callback4 = [](const std::map<std::string, SyncProcess> &process) {
378         for (const auto &item: process) {
379             ASSERT_EQ(item.second.errCode, OK);
380         }
381     };
__anon5d84cf700a02() 382     std::thread syncThread4([&]() {
383         option.mode = SYNC_MODE_CLOUD_FORCE_PUSH;
384         ASSERT_EQ(delegate_->Sync(option, callback4), OK);
385     });
386     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
387 
388     /**
389      * @tc.steps:step5. start sync task 5th.
390      * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
391      */
__anon5d84cf700b02(const std::map<std::string, SyncProcess> &process) 392     auto callback5 = [](const std::map<std::string, SyncProcess> &process) {
393         for (const auto &item: process) {
394             ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
395         }
396     };
__anon5d84cf700c02() 397     std::thread syncThread5([&]() {
398         option.mode = SYNC_MODE_CLOUD_MERGE;
399         ASSERT_EQ(delegate_->Sync(option, callback5), OK);
400     });
401 
402     syncThread1.join();
403     syncThread2.join();
404     syncThread3.join();
405     syncThread4.join();
406     syncThread5.join();
407 }
408 
409 /**
410  * @tc.name: CloudSyncMergeTaskTest003
411  * @tc.desc: test merge sync task which merge is false.
412  * @tc.type: FUNC
413  * @tc.require:
414  * @tc.author: liaoyonghuang
415  */
416 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest003, TestSize.Level1)
417 {
418     /**
419      * @tc.steps:step1. insert user table record.
420      * @tc.expected: step1. ok.
421      */
422     const int actualCount = 10; // 10 is count of records
423     InsertUserTableRecord(tableNameA_, actualCount);
424     Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
425     CloudSyncOption option;
426     PrepareOption(option, normalQuery1, true);
427     /**
428      * @tc.steps:step2. set 2s block time for sync task 1st, and start sync task 2nd.
429      * @tc.expected: step2. ok.
430      */
431     virtualCloudDb_->SetBlockTime(2000); // block 1st sync task 2s.
__anon5d84cf700d02() 432     std::thread syncThread1([&]() {
433         ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
434     });
435     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
__anon5d84cf700e02() 436     std::thread syncThread2([&]() {
437         ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
438     });
439     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
440     /**
441      * @tc.steps:step3. start sync task 3rd.
442      * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
443      */
__anon5d84cf700f02(const std::map<std::string, SyncProcess> &process) 444     auto callback3 = [](const std::map<std::string, SyncProcess> &process) {
445         for (const auto &item: process) {
446             ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
447             EXPECT_EQ(item.second.tableProcess.size(), 1u);
448             for (const auto &table : item.second.tableProcess) {
449                 EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
450             }
451         }
452     };
__anon5d84cf701002() 453     std::thread syncThread3([&]() {
454         ASSERT_EQ(delegate_->Sync(option, callback3), OK);
455     });
456     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
457     /**
458      * @tc.steps:step4. start sync task 4th.
459      * @tc.expected: task OK because it cannot be merged.
460      */
__anon5d84cf701102(const std::map<std::string, SyncProcess> &process) 461     auto callback4 = [](const std::map<std::string, SyncProcess> &process) {
462         for (const auto &item: process) {
463             ASSERT_EQ(item.second.errCode, OK);
464         }
465     };
__anon5d84cf701202() 466     std::thread syncThread4([&]() {
467         option.merge = false;
468         ASSERT_EQ(delegate_->Sync(option, callback4), OK);
469     });
470 
471     syncThread1.join();
472     syncThread2.join();
473     syncThread3.join();
474     syncThread4.join();
475 }
476 
477 /**
478  * @tc.name: CloudSyncMergeTaskTest004
479  * @tc.desc: test merge sync task with async download asset
480  * @tc.type: FUNC
481  * @tc.require:
482  * @tc.author: zqq
483  */
484 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest004, TestSize.Level1)
485 {
486     size_t finishCount = 0u;
487     std::mutex callbackMutex;
488     std::condition_variable callbackCv;
__anon5d84cf701302(DBStatus status) 489     auto callback = GetProcessCallback([](DBStatus status) {
490         EXPECT_EQ(status, OK);
491     }, callbackMutex, callbackCv, finishCount);
492 
493     Query normalQuery = Query::Select().FromTable({ tableNameA_ });
494     CloudSyncOption option;
495     PrepareOption(option, normalQuery, true);
496     ASSERT_EQ(delegate_->Sync(option, callback), OK);
497     option.asyncDownloadAssets = true;
498     ASSERT_EQ(delegate_->Sync(option, callback), OK);
499 
500     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anon5d84cf701402() 501     callbackCv.wait(callbackLock, [&finishCount]() {
502         return (finishCount >= 2u); // download 2 times
503     });
504 }
505 }
506 #endif