• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rdb_general_ut.h"
17 #include "relational_store_delegate_impl.h"
18 #include "single_ver_relational_syncer.h"
19 #include "sqlite_relational_store.h"
20 #include "sqlite_relational_store_connection.h"
21 #include "sync_able_engine.h"
22 #include "mock_sync_engine.h"
23 
24 namespace DistributedDB {
25 using namespace testing::ext;
26 using namespace DistributedDB;
27 using namespace DistributedDBUnitTest;
28 
29 class DistributedDBRDBCompressTest : public RDBGeneralUt {
30 public:
31     void SetUp() override;
32 protected:
33     void InitOption(DistributedTableMode tableMode, bool isNeedCompressOnSync, uint8_t compressRate);
34     void InitInfo(const StoreInfo &info, const char *dev, DistributedTableMode tableMode,
35         bool isNeedCompressOnSync, uint8_t compressRate);
36     void SimpleSyncTest(bool store1Compress, bool store2Compress, uint8_t store1Rate, uint8_t store2Rate,
37         SyncMode mode);
38     void CompressTest(bool store1Compress, bool store2Compress, uint8_t store1Rate,
39         uint8_t store2Rate, SyncMode mode);
40     static UtDateBaseSchemaInfo GetDefaultSchema();
41     static UtTableSchemaInfo GetTableSchema(const std::string &table);
42     static std::string GetInsertSQL();
43     static std::string GetConditionSQL();
44     static constexpr const char *DEVICE_SYNC_TABLE = "DEVICE_SYNC_TABLE";
45     static constexpr const char *DEVICE_A = "DEVICE_A";
46     static constexpr const char *DEVICE_B = "DEVICE_B";
47     const StoreInfo info1_ = {USER_ID, APP_ID, STORE_ID_1};
48     const StoreInfo info2_ = {USER_ID, APP_ID, STORE_ID_2};
49     std::atomic<DistributedTableMode> mode_ = DistributedTableMode::COLLABORATION;
50 };
51 
SetUp()52 void DistributedDBRDBCompressTest::SetUp()
53 {
54     mode_ = DistributedTableMode::COLLABORATION;
55     RDBGeneralUt::SetUp();
56     SetSchemaInfo(info1_, GetDefaultSchema());
57     SetSchemaInfo(info2_, GetDefaultSchema());
58 }
59 
InitOption(DistributedTableMode tableMode,bool isNeedCompressOnSync,uint8_t compressRate)60 void DistributedDBRDBCompressTest::InitOption(DistributedTableMode tableMode, bool isNeedCompressOnSync,
61     uint8_t compressRate)
62 {
63     RelationalStoreDelegate::Option option;
64     option.tableMode = tableMode;
65     option.isNeedCompressOnSync = isNeedCompressOnSync;
66     option.compressionRate = compressRate;
67     SetOption(option);
68 }
69 
InitInfo(const StoreInfo & info,const char * dev,DistributedTableMode tableMode,bool isNeedCompressOnSync,uint8_t compressRate)70 void DistributedDBRDBCompressTest::InitInfo(const StoreInfo &info, const char *dev, DistributedTableMode tableMode,
71     bool isNeedCompressOnSync, uint8_t compressRate)
72 {
73     InitOption(tableMode, isNeedCompressOnSync, compressRate);
74     ASSERT_EQ(BasicUnitTest::InitDelegate(info, dev), E_OK);
75     ASSERT_EQ(SetDistributedTables(info, {DEVICE_SYNC_TABLE}), E_OK);
76 }
77 
GetDefaultSchema()78 UtDateBaseSchemaInfo DistributedDBRDBCompressTest::GetDefaultSchema()
79 {
80     UtDateBaseSchemaInfo info;
81     info.tablesInfo.push_back(GetTableSchema(DEVICE_SYNC_TABLE));
82     return info;
83 }
84 
GetTableSchema(const std::string & table)85 UtTableSchemaInfo DistributedDBRDBCompressTest::GetTableSchema(const std::string &table)
86 {
87     UtTableSchemaInfo tableSchema;
88     tableSchema.name = table;
89     UtFieldInfo field;
90     field.field.colName = "id";
91     field.field.type = TYPE_INDEX<int64_t>;
92     field.field.primary = true;
93     tableSchema.fieldInfo.push_back(field);
94     field.field.colName = "val";
95     field.field.type = TYPE_INDEX<std::string>;
96     field.field.primary = false;
97     tableSchema.fieldInfo.push_back(field);
98     return tableSchema;
99 }
100 
GetInsertSQL()101 std::string DistributedDBRDBCompressTest::GetInsertSQL()
102 {
103     std::string sql = "INSERT OR REPLACE INTO ";
104     sql.append(DEVICE_SYNC_TABLE).append("(id, val) VALUES(1, '");
105     for (size_t i = 0; i < 1000; ++i) { // 1000 is data len
106         sql.append("v");
107     }
108     sql.append("')");
109     return sql;
110 }
111 
GetConditionSQL()112 std::string DistributedDBRDBCompressTest::GetConditionSQL()
113 {
114     std::string condition = " id=1 AND val='";
115     for (size_t i = 0; i < 1000; ++i) { // 1000 is data len
116         condition.append("v");
117     }
118     condition.append("' ");
119     return condition;
120 }
121 
SimpleSyncTest(bool store1Compress,bool store2Compress,uint8_t store1Rate,uint8_t store2Rate,SyncMode mode)122 void DistributedDBRDBCompressTest::SimpleSyncTest(bool store1Compress, bool store2Compress, uint8_t store1Rate,
123     uint8_t store2Rate, SyncMode mode)
124 {
125     LOGI("SimpleSyncTest begin store1Compress %d store2Compress %d store1Rate %" PRIu8 " store2Rate %" PRIu8,
126         store1Compress, store2Compress, store1Rate, store2Rate);
127     RDBGeneralUt::CloseAllDelegate();
128     SetSchemaInfo(info1_, GetDefaultSchema());
129     SetSchemaInfo(info2_, GetDefaultSchema());
130     /**
131      * @tc.steps: step1. Init store1 and store2.
132      * @tc.expected: step1. Ok
133      */
134     ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, store1Compress,
135         store1Rate));
136     ASSERT_NO_FATAL_FAILURE(InitInfo(info2_, DEVICE_B, mode_, store2Compress,
137         store2Rate));
138     /**
139      * @tc.steps: step2. Store1 insert local data.
140      * @tc.expected: step2. Ok
141      */
142     ExecuteSQL(GetInsertSQL(), info1_);
143     /**
144      * @tc.steps: step3. DEV_A sync to DEV_B.
145      * @tc.expected: step3. Ok
146      */
147     if (mode == SYNC_MODE_PUSH_ONLY) {
148         ASSERT_NO_FATAL_FAILURE(BlockPush(info1_, info2_, DEVICE_SYNC_TABLE));
149     } else if (mode == SYNC_MODE_PULL_ONLY) {
150         ASSERT_NO_FATAL_FAILURE(BlockPull(info2_, info1_, DEVICE_SYNC_TABLE));
151     }
152     /**
153      * @tc.steps: step4. Check store2's data.
154      * @tc.expected: step4. Exist 1 row
155      */
156     if (mode_ == DistributedTableMode::COLLABORATION) {
157         EXPECT_EQ(CountTableData(info2_, DEVICE_SYNC_TABLE, GetConditionSQL()), 1);
158     } else if (mode_ == DistributedTableMode::SPLIT_BY_DEVICE) {
159         EXPECT_EQ(CountTableData(info2_,
160             DBCommon::GetDistributedTableName(DEVICE_A, DEVICE_SYNC_TABLE), GetConditionSQL()), 1);
161     }
162 }
163 
CompressTest(bool store1Compress,bool store2Compress,uint8_t store1Rate,uint8_t store2Rate,SyncMode mode)164 void DistributedDBRDBCompressTest::CompressTest(bool store1Compress, bool store2Compress, uint8_t store1Rate,
165     uint8_t store2Rate, SyncMode mode)
166 {
167     /**
168      * @tc.steps: step1. Store1 sync to store2 with compress.
169      * @tc.expected: step1. Sync ok
170      */
171     auto size1 = GetAllSendMsgSize();
172     SimpleSyncTest(store1Compress, store2Compress, store1Rate, store2Rate, mode); // both compress rate is 100
173     auto size2 = GetAllSendMsgSize();
174     /**
175      * @tc.steps: step2. Store1 sync to store2 without compress.
176      * @tc.expected: step2. Sync ok and
177      */
178     auto size3 = GetAllSendMsgSize();
179     SimpleSyncTest(store1Compress, store2Compress, store1Rate, store2Rate, mode); // both compress rate is 100
180     auto size4 = GetAllSendMsgSize();
181     EXPECT_LE(size2 - size1, size4 - size3);
182 }
183 
184 /**
185  * @tc.name: RDBCompressSync001
186  * @tc.desc: Test collaboration table push sync with compress.
187  * @tc.type: FUNC
188  * @tc.author: zqq
189  */
190 HWTEST_F(DistributedDBRDBCompressTest, RDBCompressSync001, TestSize.Level0)
191 {
192     ASSERT_NO_FATAL_FAILURE(CompressTest(true, true, 100, 100, SyncMode::SYNC_MODE_PUSH_ONLY));
193 }
194 
195 /**
196  * @tc.name: RDBCompressSync002
197  * @tc.desc: Test collaboration table pull sync with compress.
198  * @tc.type: FUNC
199  * @tc.author: zqq
200  */
201 HWTEST_F(DistributedDBRDBCompressTest, RDBCompressSync002, TestSize.Level0)
202 {
203     ASSERT_NO_FATAL_FAILURE(CompressTest(true, true, 100, 100, SyncMode::SYNC_MODE_PULL_ONLY));
204 }
205 
206 /**
207  * @tc.name: RDBCompressSync003
208  * @tc.desc: Test split table push sync with compress.
209  * @tc.type: FUNC
210  * @tc.author: zqq
211  */
212 HWTEST_F(DistributedDBRDBCompressTest, RDBCompressSync003, TestSize.Level1)
213 {
214     mode_ = DistributedTableMode::SPLIT_BY_DEVICE;
215     ASSERT_NO_FATAL_FAILURE(CompressTest(true, true, 100, 100, SyncMode::SYNC_MODE_PUSH_ONLY));
216 }
217 
218 /**
219  * @tc.name: RDBCompressSync004
220  * @tc.desc: Test split table pull sync with compress.
221  * @tc.type: FUNC
222  * @tc.author: zqq
223  */
224 HWTEST_F(DistributedDBRDBCompressTest, RDBCompressSync004, TestSize.Level1)
225 {
226     mode_ = DistributedTableMode::SPLIT_BY_DEVICE;
227     ASSERT_NO_FATAL_FAILURE(CompressTest(true, true, 100, 100, SyncMode::SYNC_MODE_PULL_ONLY));
228 }
229 
230 /**
231  * @tc.name: RDBCompressSync005
232  * @tc.desc: Test collaboration table push sync with compress but remote is not compress.
233  * @tc.type: FUNC
234  * @tc.author: zqq
235  */
236 HWTEST_F(DistributedDBRDBCompressTest, RDBCompressSync005, TestSize.Level1)
237 {
238     ASSERT_NO_FATAL_FAILURE(CompressTest(true, false, 100, 100, SyncMode::SYNC_MODE_PUSH_ONLY));
239 }
240 
241 /**
242  * @tc.name: RDBCompressSync006
243  * @tc.desc: Test collaboration table pull sync with compress but remote is not compress.
244  * @tc.type: FUNC
245  * @tc.author: zqq
246  */
247 HWTEST_F(DistributedDBRDBCompressTest, RDBCompressSync006, TestSize.Level1)
248 {
249     ASSERT_NO_FATAL_FAILURE(CompressTest(true, false, 100, 100, SyncMode::SYNC_MODE_PULL_ONLY));
250 }
251 
252 /**
253  * @tc.name: RDBInvalidCompress001
254  * @tc.desc: Test open store with invalid compress option.
255  * @tc.type: FUNC
256  * @tc.author: zqq
257  */
258 HWTEST_F(DistributedDBRDBCompressTest, RDBInvalidCompress001, TestSize.Level0)
259 {
260     ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, true, 0));
261     RDBGeneralUt::CloseAllDelegate();
262     ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, true, 101)); // compress rate is 101
263     RDBGeneralUt::CloseAllDelegate();
264     ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, false, 0));
265     RDBGeneralUt::CloseAllDelegate();
266     ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, false, 101)); // compress rate is 101
267 }
268 
269 /**
270  * @tc.name: RDBInvalidCompress001
271  * @tc.desc: Test open store with invalid compress option.
272  * @tc.type: FUNC
273  * @tc.author: zqq
274  */
275 HWTEST_F(DistributedDBRDBCompressTest, RDBInvalidCompress002, TestSize.Level0)
276 {
277     /**
278      * @tc.steps: step1. Open store1 with compress and rate1.
279      * @tc.expected: step1. Open ok
280      */
281     ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, true, 1));
282     /**
283      * @tc.steps: step2. Open store1 with compress and rate2.
284      * @tc.expected: step2. Open failed by conflict with cache
285      */
286     InitOption(mode_, true, 2); // compress rate is 2
287     auto [errCode, delegate] = OpenRDBStore(info1_);
288     EXPECT_EQ(errCode, INVALID_ARGS);
289     EXPECT_EQ(delegate, nullptr);
290     /**
291      * @tc.steps: step3. Open store1 without compress and rate1.
292      * @tc.expected: step3. Open failed by conflict with cache
293      */
294     InitOption(mode_, false, 1); // compress rate is 1
295     std::tie(errCode, delegate) = OpenRDBStore(info1_);
296     EXPECT_EQ(errCode, INVALID_ARGS);
297     EXPECT_EQ(delegate, nullptr);
298     InitOption(mode_, true, 100); // compress rate is 100
299     std::tie(errCode, delegate) = OpenRDBStore(info1_);
300     EXPECT_EQ(errCode, INVALID_ARGS);
301     EXPECT_EQ(delegate, nullptr);
302     /**
303      * @tc.steps: step4. Open store1 without compress and rate100 after close store1.
304      * @tc.expected: step4. Open ok
305      */
306     RDBGeneralUt::CloseAllDelegate();
307     /**
308      * @tc.steps: step5. Open store1 without compress and rate100.
309      * @tc.expected: step5. Open ok
310      */
311     ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, false, 100)); // compress rate is 100
312     InitOption(mode_, false, 2); // compress rate is 2
313     std::tie(errCode, delegate) = OpenRDBStore(info1_);
314     EXPECT_EQ(errCode, OK);
315     ASSERT_NE(delegate, nullptr);
316     RelationalStoreManager mgr(info1_.appId, info1_.userId);
317     mgr.CloseStore(delegate);
318 }
319 
320 /**
321  * @tc.name: RDBGetDevTaskCount001
322  * @tc.desc: Test get dev task count when sync.
323  * @tc.type: FUNC
324  * @tc.author: zqq
325  */
326 HWTEST_F(DistributedDBRDBCompressTest, RDBGetDevTaskCount001, TestSize.Level0)
327 {
328     std::atomic<bool> checkFlag = false;
__anonadd313530102(const std::string &dev, const Message *inMsg) 329     RegBeforeDispatch([this, &checkFlag](const std::string &dev, const Message *inMsg) {
330         if (dev != DEVICE_B || inMsg->GetMessageId() != QUERY_SYNC_MESSAGE) {
331             return;
332         }
333         if (checkFlag) {
334             return;
335         }
336         checkFlag = true;
337         auto store1 = GetDelegate(info1_);
338         ASSERT_NE(store1, nullptr);
339         EXPECT_GT(store1->GetDeviceSyncTaskCount(), 0);
340         auto store2 = GetDelegate(info2_);
341         ASSERT_NE(store2, nullptr);
342         EXPECT_GT(store2->GetDeviceSyncTaskCount(), 0);
343     });
344     CompressTest(true, true, 100, 100, SyncMode::SYNC_MODE_PULL_ONLY);
345     RegBeforeDispatch(nullptr);
346 }
347 
348 /**
349  * @tc.name: VerifyDeviceSyncTaskCountAfterRemoteQuery001
350  * @tc.desc: Test get device task count when remote query.
351  * @tc.type: FUNC
352  * @tc.author: zqq
353  */
354 HWTEST_F(DistributedDBRDBCompressTest, VerifyDeviceSyncTaskCountAfterRemoteQuery001, TestSize.Level0)
355 {
356     /**
357      * @tc.steps: step1. Init store1 and store2.
358      * @tc.expected: step1. Ok
359      */
360     RDBGeneralUt::CloseAllDelegate();
361     SetSchemaInfo(info1_, GetDefaultSchema());
362     SetSchemaInfo(info2_, GetDefaultSchema());
363     ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, true, 100)); // compress rate is 100
364     ASSERT_NO_FATAL_FAILURE(InitInfo(info2_, DEVICE_B, mode_, true, 100)); // compress rate is 100
365     std::atomic<int> msgCount = 0;
__anonadd313530202(const std::string &dev, const Message *inMsg) 366     RegBeforeDispatch([this, &msgCount](const std::string &dev, const Message *inMsg) {
367         if (dev != DEVICE_A || inMsg->GetMessageId() != REMOTE_EXECUTE_MESSAGE) {
368             return;
369         }
370         auto store1 = GetDelegate(info1_);
371         ASSERT_NE(store1, nullptr);
372         EXPECT_GT(store1->GetDeviceSyncTaskCount(), 0);
373         msgCount++;
374     });
375     RemoteQuery(info1_, info2_, "SELECT * FROM DEVICE_SYNC_TABLE", OK);
376     RegBeforeDispatch(nullptr);
377     EXPECT_GT(msgCount, 0);
378 }
379 
380 /**
381  * @tc.name: VerifyInvalidGetDeviceSyncTaskCount001
382  * @tc.desc: Test abnormal get device task count.
383  * @tc.type: FUNC
384  * @tc.author: zqq
385  */
386 HWTEST_F(DistributedDBRDBCompressTest, VerifyInvalidGetDeviceSyncTaskCount001, TestSize.Level0)
387 {
388     RelationalStoreDelegateImpl delegate;
389     EXPECT_EQ(delegate.GetDeviceSyncTaskCount(), 0);
390     SyncAbleEngine engine(nullptr);
391     EXPECT_EQ(engine.GetDeviceSyncTaskCount(), 0);
392     SQLiteRelationalStore store;
393     EXPECT_EQ(store.GetDeviceSyncTaskCount(), 0);
394     SQLiteRelationalStoreConnection connection(nullptr);
395     EXPECT_EQ(connection.GetDeviceSyncTaskCount(), 0);
396     MockSyncEngine syncEngine;
397     EXPECT_EQ(syncEngine.GetRemoteQueryTaskCount(), 0);
398     SingleVerRelationalSyncer syncer;
399     EXPECT_EQ(syncer.GetTaskCount(), 0);
400 }
401 }