1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rdb_general_ut.h"
17 #include "relational_store_delegate_impl.h"
18 #include "single_ver_relational_syncer.h"
19 #include "sqlite_relational_store.h"
20 #include "sqlite_relational_store_connection.h"
21 #include "sync_able_engine.h"
22 #include "mock_sync_engine.h"
23
24 namespace DistributedDB {
25 using namespace testing::ext;
26 using namespace DistributedDB;
27 using namespace DistributedDBUnitTest;
28
29 class DistributedDBRDBCompressTest : public RDBGeneralUt {
30 public:
31 void SetUp() override;
32 protected:
33 void InitOption(DistributedTableMode tableMode, bool isNeedCompressOnSync, uint8_t compressRate);
34 void InitInfo(const StoreInfo &info, const char *dev, DistributedTableMode tableMode,
35 bool isNeedCompressOnSync, uint8_t compressRate);
36 void SimpleSyncTest(bool store1Compress, bool store2Compress, uint8_t store1Rate, uint8_t store2Rate,
37 SyncMode mode);
38 void CompressTest(bool store1Compress, bool store2Compress, uint8_t store1Rate,
39 uint8_t store2Rate, SyncMode mode);
40 static UtDateBaseSchemaInfo GetDefaultSchema();
41 static UtTableSchemaInfo GetTableSchema(const std::string &table);
42 static std::string GetInsertSQL();
43 static std::string GetConditionSQL();
44 static constexpr const char *DEVICE_SYNC_TABLE = "DEVICE_SYNC_TABLE";
45 static constexpr const char *DEVICE_A = "DEVICE_A";
46 static constexpr const char *DEVICE_B = "DEVICE_B";
47 const StoreInfo info1_ = {USER_ID, APP_ID, STORE_ID_1};
48 const StoreInfo info2_ = {USER_ID, APP_ID, STORE_ID_2};
49 std::atomic<DistributedTableMode> mode_ = DistributedTableMode::COLLABORATION;
50 };
51
SetUp()52 void DistributedDBRDBCompressTest::SetUp()
53 {
54 mode_ = DistributedTableMode::COLLABORATION;
55 RDBGeneralUt::SetUp();
56 SetSchemaInfo(info1_, GetDefaultSchema());
57 SetSchemaInfo(info2_, GetDefaultSchema());
58 }
59
InitOption(DistributedTableMode tableMode,bool isNeedCompressOnSync,uint8_t compressRate)60 void DistributedDBRDBCompressTest::InitOption(DistributedTableMode tableMode, bool isNeedCompressOnSync,
61 uint8_t compressRate)
62 {
63 RelationalStoreDelegate::Option option;
64 option.tableMode = tableMode;
65 option.isNeedCompressOnSync = isNeedCompressOnSync;
66 option.compressionRate = compressRate;
67 SetOption(option);
68 }
69
InitInfo(const StoreInfo & info,const char * dev,DistributedTableMode tableMode,bool isNeedCompressOnSync,uint8_t compressRate)70 void DistributedDBRDBCompressTest::InitInfo(const StoreInfo &info, const char *dev, DistributedTableMode tableMode,
71 bool isNeedCompressOnSync, uint8_t compressRate)
72 {
73 InitOption(tableMode, isNeedCompressOnSync, compressRate);
74 ASSERT_EQ(BasicUnitTest::InitDelegate(info, dev), E_OK);
75 ASSERT_EQ(SetDistributedTables(info, {DEVICE_SYNC_TABLE}), E_OK);
76 }
77
GetDefaultSchema()78 UtDateBaseSchemaInfo DistributedDBRDBCompressTest::GetDefaultSchema()
79 {
80 UtDateBaseSchemaInfo info;
81 info.tablesInfo.push_back(GetTableSchema(DEVICE_SYNC_TABLE));
82 return info;
83 }
84
GetTableSchema(const std::string & table)85 UtTableSchemaInfo DistributedDBRDBCompressTest::GetTableSchema(const std::string &table)
86 {
87 UtTableSchemaInfo tableSchema;
88 tableSchema.name = table;
89 UtFieldInfo field;
90 field.field.colName = "id";
91 field.field.type = TYPE_INDEX<int64_t>;
92 field.field.primary = true;
93 tableSchema.fieldInfo.push_back(field);
94 field.field.colName = "val";
95 field.field.type = TYPE_INDEX<std::string>;
96 field.field.primary = false;
97 tableSchema.fieldInfo.push_back(field);
98 return tableSchema;
99 }
100
GetInsertSQL()101 std::string DistributedDBRDBCompressTest::GetInsertSQL()
102 {
103 std::string sql = "INSERT OR REPLACE INTO ";
104 sql.append(DEVICE_SYNC_TABLE).append("(id, val) VALUES(1, '");
105 for (size_t i = 0; i < 1000; ++i) { // 1000 is data len
106 sql.append("v");
107 }
108 sql.append("')");
109 return sql;
110 }
111
GetConditionSQL()112 std::string DistributedDBRDBCompressTest::GetConditionSQL()
113 {
114 std::string condition = " id=1 AND val='";
115 for (size_t i = 0; i < 1000; ++i) { // 1000 is data len
116 condition.append("v");
117 }
118 condition.append("' ");
119 return condition;
120 }
121
SimpleSyncTest(bool store1Compress,bool store2Compress,uint8_t store1Rate,uint8_t store2Rate,SyncMode mode)122 void DistributedDBRDBCompressTest::SimpleSyncTest(bool store1Compress, bool store2Compress, uint8_t store1Rate,
123 uint8_t store2Rate, SyncMode mode)
124 {
125 LOGI("SimpleSyncTest begin store1Compress %d store2Compress %d store1Rate %" PRIu8 " store2Rate %" PRIu8,
126 store1Compress, store2Compress, store1Rate, store2Rate);
127 RDBGeneralUt::CloseAllDelegate();
128 SetSchemaInfo(info1_, GetDefaultSchema());
129 SetSchemaInfo(info2_, GetDefaultSchema());
130 /**
131 * @tc.steps: step1. Init store1 and store2.
132 * @tc.expected: step1. Ok
133 */
134 ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, store1Compress,
135 store1Rate));
136 ASSERT_NO_FATAL_FAILURE(InitInfo(info2_, DEVICE_B, mode_, store2Compress,
137 store2Rate));
138 /**
139 * @tc.steps: step2. Store1 insert local data.
140 * @tc.expected: step2. Ok
141 */
142 ExecuteSQL(GetInsertSQL(), info1_);
143 /**
144 * @tc.steps: step3. DEV_A sync to DEV_B.
145 * @tc.expected: step3. Ok
146 */
147 if (mode == SYNC_MODE_PUSH_ONLY) {
148 ASSERT_NO_FATAL_FAILURE(BlockPush(info1_, info2_, DEVICE_SYNC_TABLE));
149 } else if (mode == SYNC_MODE_PULL_ONLY) {
150 ASSERT_NO_FATAL_FAILURE(BlockPull(info2_, info1_, DEVICE_SYNC_TABLE));
151 }
152 /**
153 * @tc.steps: step4. Check store2's data.
154 * @tc.expected: step4. Exist 1 row
155 */
156 if (mode_ == DistributedTableMode::COLLABORATION) {
157 EXPECT_EQ(CountTableData(info2_, DEVICE_SYNC_TABLE, GetConditionSQL()), 1);
158 } else if (mode_ == DistributedTableMode::SPLIT_BY_DEVICE) {
159 EXPECT_EQ(CountTableData(info2_,
160 DBCommon::GetDistributedTableName(DEVICE_A, DEVICE_SYNC_TABLE), GetConditionSQL()), 1);
161 }
162 }
163
CompressTest(bool store1Compress,bool store2Compress,uint8_t store1Rate,uint8_t store2Rate,SyncMode mode)164 void DistributedDBRDBCompressTest::CompressTest(bool store1Compress, bool store2Compress, uint8_t store1Rate,
165 uint8_t store2Rate, SyncMode mode)
166 {
167 /**
168 * @tc.steps: step1. Store1 sync to store2 with compress.
169 * @tc.expected: step1. Sync ok
170 */
171 auto size1 = GetAllSendMsgSize();
172 SimpleSyncTest(store1Compress, store2Compress, store1Rate, store2Rate, mode); // both compress rate is 100
173 auto size2 = GetAllSendMsgSize();
174 /**
175 * @tc.steps: step2. Store1 sync to store2 without compress.
176 * @tc.expected: step2. Sync ok and
177 */
178 auto size3 = GetAllSendMsgSize();
179 SimpleSyncTest(store1Compress, store2Compress, store1Rate, store2Rate, mode); // both compress rate is 100
180 auto size4 = GetAllSendMsgSize();
181 EXPECT_LE(size2 - size1, size4 - size3);
182 }
183
184 /**
185 * @tc.name: RDBCompressSync001
186 * @tc.desc: Test collaboration table push sync with compress.
187 * @tc.type: FUNC
188 * @tc.author: zqq
189 */
190 HWTEST_F(DistributedDBRDBCompressTest, RDBCompressSync001, TestSize.Level0)
191 {
192 ASSERT_NO_FATAL_FAILURE(CompressTest(true, true, 100, 100, SyncMode::SYNC_MODE_PUSH_ONLY));
193 }
194
195 /**
196 * @tc.name: RDBCompressSync002
197 * @tc.desc: Test collaboration table pull sync with compress.
198 * @tc.type: FUNC
199 * @tc.author: zqq
200 */
201 HWTEST_F(DistributedDBRDBCompressTest, RDBCompressSync002, TestSize.Level0)
202 {
203 ASSERT_NO_FATAL_FAILURE(CompressTest(true, true, 100, 100, SyncMode::SYNC_MODE_PULL_ONLY));
204 }
205
206 /**
207 * @tc.name: RDBCompressSync003
208 * @tc.desc: Test split table push sync with compress.
209 * @tc.type: FUNC
210 * @tc.author: zqq
211 */
212 HWTEST_F(DistributedDBRDBCompressTest, RDBCompressSync003, TestSize.Level1)
213 {
214 mode_ = DistributedTableMode::SPLIT_BY_DEVICE;
215 ASSERT_NO_FATAL_FAILURE(CompressTest(true, true, 100, 100, SyncMode::SYNC_MODE_PUSH_ONLY));
216 }
217
218 /**
219 * @tc.name: RDBCompressSync004
220 * @tc.desc: Test split table pull sync with compress.
221 * @tc.type: FUNC
222 * @tc.author: zqq
223 */
224 HWTEST_F(DistributedDBRDBCompressTest, RDBCompressSync004, TestSize.Level1)
225 {
226 mode_ = DistributedTableMode::SPLIT_BY_DEVICE;
227 ASSERT_NO_FATAL_FAILURE(CompressTest(true, true, 100, 100, SyncMode::SYNC_MODE_PULL_ONLY));
228 }
229
230 /**
231 * @tc.name: RDBCompressSync005
232 * @tc.desc: Test collaboration table push sync with compress but remote is not compress.
233 * @tc.type: FUNC
234 * @tc.author: zqq
235 */
236 HWTEST_F(DistributedDBRDBCompressTest, RDBCompressSync005, TestSize.Level1)
237 {
238 ASSERT_NO_FATAL_FAILURE(CompressTest(true, false, 100, 100, SyncMode::SYNC_MODE_PUSH_ONLY));
239 }
240
241 /**
242 * @tc.name: RDBCompressSync006
243 * @tc.desc: Test collaboration table pull sync with compress but remote is not compress.
244 * @tc.type: FUNC
245 * @tc.author: zqq
246 */
247 HWTEST_F(DistributedDBRDBCompressTest, RDBCompressSync006, TestSize.Level1)
248 {
249 ASSERT_NO_FATAL_FAILURE(CompressTest(true, false, 100, 100, SyncMode::SYNC_MODE_PULL_ONLY));
250 }
251
252 /**
253 * @tc.name: RDBInvalidCompress001
254 * @tc.desc: Test open store with invalid compress option.
255 * @tc.type: FUNC
256 * @tc.author: zqq
257 */
258 HWTEST_F(DistributedDBRDBCompressTest, RDBInvalidCompress001, TestSize.Level0)
259 {
260 ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, true, 0));
261 RDBGeneralUt::CloseAllDelegate();
262 ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, true, 101)); // compress rate is 101
263 RDBGeneralUt::CloseAllDelegate();
264 ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, false, 0));
265 RDBGeneralUt::CloseAllDelegate();
266 ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, false, 101)); // compress rate is 101
267 }
268
269 /**
270 * @tc.name: RDBInvalidCompress001
271 * @tc.desc: Test open store with invalid compress option.
272 * @tc.type: FUNC
273 * @tc.author: zqq
274 */
275 HWTEST_F(DistributedDBRDBCompressTest, RDBInvalidCompress002, TestSize.Level0)
276 {
277 /**
278 * @tc.steps: step1. Open store1 with compress and rate1.
279 * @tc.expected: step1. Open ok
280 */
281 ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, true, 1));
282 /**
283 * @tc.steps: step2. Open store1 with compress and rate2.
284 * @tc.expected: step2. Open failed by conflict with cache
285 */
286 InitOption(mode_, true, 2); // compress rate is 2
287 auto [errCode, delegate] = OpenRDBStore(info1_);
288 EXPECT_EQ(errCode, INVALID_ARGS);
289 EXPECT_EQ(delegate, nullptr);
290 /**
291 * @tc.steps: step3. Open store1 without compress and rate1.
292 * @tc.expected: step3. Open failed by conflict with cache
293 */
294 InitOption(mode_, false, 1); // compress rate is 1
295 std::tie(errCode, delegate) = OpenRDBStore(info1_);
296 EXPECT_EQ(errCode, INVALID_ARGS);
297 EXPECT_EQ(delegate, nullptr);
298 InitOption(mode_, true, 100); // compress rate is 100
299 std::tie(errCode, delegate) = OpenRDBStore(info1_);
300 EXPECT_EQ(errCode, INVALID_ARGS);
301 EXPECT_EQ(delegate, nullptr);
302 /**
303 * @tc.steps: step4. Open store1 without compress and rate100 after close store1.
304 * @tc.expected: step4. Open ok
305 */
306 RDBGeneralUt::CloseAllDelegate();
307 /**
308 * @tc.steps: step5. Open store1 without compress and rate100.
309 * @tc.expected: step5. Open ok
310 */
311 ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, false, 100)); // compress rate is 100
312 InitOption(mode_, false, 2); // compress rate is 2
313 std::tie(errCode, delegate) = OpenRDBStore(info1_);
314 EXPECT_EQ(errCode, OK);
315 ASSERT_NE(delegate, nullptr);
316 RelationalStoreManager mgr(info1_.appId, info1_.userId);
317 mgr.CloseStore(delegate);
318 }
319
320 /**
321 * @tc.name: RDBGetDevTaskCount001
322 * @tc.desc: Test get dev task count when sync.
323 * @tc.type: FUNC
324 * @tc.author: zqq
325 */
326 HWTEST_F(DistributedDBRDBCompressTest, RDBGetDevTaskCount001, TestSize.Level0)
327 {
328 std::atomic<bool> checkFlag = false;
__anonadd313530102(const std::string &dev, const Message *inMsg) 329 RegBeforeDispatch([this, &checkFlag](const std::string &dev, const Message *inMsg) {
330 if (dev != DEVICE_B || inMsg->GetMessageId() != QUERY_SYNC_MESSAGE) {
331 return;
332 }
333 if (checkFlag) {
334 return;
335 }
336 checkFlag = true;
337 auto store1 = GetDelegate(info1_);
338 ASSERT_NE(store1, nullptr);
339 EXPECT_GT(store1->GetDeviceSyncTaskCount(), 0);
340 auto store2 = GetDelegate(info2_);
341 ASSERT_NE(store2, nullptr);
342 EXPECT_GT(store2->GetDeviceSyncTaskCount(), 0);
343 });
344 CompressTest(true, true, 100, 100, SyncMode::SYNC_MODE_PULL_ONLY);
345 RegBeforeDispatch(nullptr);
346 }
347
348 /**
349 * @tc.name: VerifyDeviceSyncTaskCountAfterRemoteQuery001
350 * @tc.desc: Test get device task count when remote query.
351 * @tc.type: FUNC
352 * @tc.author: zqq
353 */
354 HWTEST_F(DistributedDBRDBCompressTest, VerifyDeviceSyncTaskCountAfterRemoteQuery001, TestSize.Level0)
355 {
356 /**
357 * @tc.steps: step1. Init store1 and store2.
358 * @tc.expected: step1. Ok
359 */
360 RDBGeneralUt::CloseAllDelegate();
361 SetSchemaInfo(info1_, GetDefaultSchema());
362 SetSchemaInfo(info2_, GetDefaultSchema());
363 ASSERT_NO_FATAL_FAILURE(InitInfo(info1_, DEVICE_A, mode_, true, 100)); // compress rate is 100
364 ASSERT_NO_FATAL_FAILURE(InitInfo(info2_, DEVICE_B, mode_, true, 100)); // compress rate is 100
365 std::atomic<int> msgCount = 0;
__anonadd313530202(const std::string &dev, const Message *inMsg) 366 RegBeforeDispatch([this, &msgCount](const std::string &dev, const Message *inMsg) {
367 if (dev != DEVICE_A || inMsg->GetMessageId() != REMOTE_EXECUTE_MESSAGE) {
368 return;
369 }
370 auto store1 = GetDelegate(info1_);
371 ASSERT_NE(store1, nullptr);
372 EXPECT_GT(store1->GetDeviceSyncTaskCount(), 0);
373 msgCount++;
374 });
375 RemoteQuery(info1_, info2_, "SELECT * FROM DEVICE_SYNC_TABLE", OK);
376 RegBeforeDispatch(nullptr);
377 EXPECT_GT(msgCount, 0);
378 }
379
380 /**
381 * @tc.name: VerifyInvalidGetDeviceSyncTaskCount001
382 * @tc.desc: Test abnormal get device task count.
383 * @tc.type: FUNC
384 * @tc.author: zqq
385 */
386 HWTEST_F(DistributedDBRDBCompressTest, VerifyInvalidGetDeviceSyncTaskCount001, TestSize.Level0)
387 {
388 RelationalStoreDelegateImpl delegate;
389 EXPECT_EQ(delegate.GetDeviceSyncTaskCount(), 0);
390 SyncAbleEngine engine(nullptr);
391 EXPECT_EQ(engine.GetDeviceSyncTaskCount(), 0);
392 SQLiteRelationalStore store;
393 EXPECT_EQ(store.GetDeviceSyncTaskCount(), 0);
394 SQLiteRelationalStoreConnection connection(nullptr);
395 EXPECT_EQ(connection.GetDeviceSyncTaskCount(), 0);
396 MockSyncEngine syncEngine;
397 EXPECT_EQ(syncEngine.GetRemoteQueryTaskCount(), 0);
398 SingleVerRelationalSyncer syncer;
399 EXPECT_EQ(syncer.GetTaskCount(), 0);
400 }
401 }