1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <condition_variable>
17 #include <gtest/gtest.h>
18 #include <thread>
19
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "platform_specific.h"
27 #include "runtime_config.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_kv_sync_task_context.h"
30
31 using namespace testing::ext;
32 using namespace DistributedDB;
33 using namespace DistributedDBUnitTest;
34 using namespace std;
35
36 namespace {
37 string g_testDir;
38 const string STORE_ID = "kv_stroe_sync_test";
39 const int64_t TIME_OFFSET = 5000000;
40 const int WAIT_TIME = 1000;
41 const std::string DEVICE_A = "real_device";
42 const std::string DEVICE_B = "deviceB";
43 const std::string DEVICE_C = "deviceC";
44
45 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
46 KvStoreConfig g_config;
47 DistributedDBToolsUnitTest g_tool;
48 DBStatus g_kvDelegateStatus = INVALID_ARGS;
49 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
50 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
51 KvVirtualDevice *g_deviceB = nullptr;
52 KvVirtualDevice *g_deviceC = nullptr;
53
54 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
55 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
56 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
57
CalculateDataTest(uint32_t itemCount,uint32_t keySize,uint32_t valueSize)58 void CalculateDataTest(uint32_t itemCount, uint32_t keySize, uint32_t valueSize)
59 {
60 for (uint32_t i = 0; i < itemCount; i++) {
61 std::vector<uint8_t> prefixKey = {'a', 'b', 'c'};
62 Key key = DistributedDBToolsUnitTest::GetRandPrefixKey(prefixKey, keySize);
63 Value value;
64 DistributedDBToolsUnitTest::GetRandomKeyValue(value, valueSize);
65 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
66 }
67 size_t dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
68 uint32_t expectedDataSize = (valueSize + keySize);
69 uint32_t externalSize = 70u;
70 uint32_t serialHeadLen = 8u;
71 LOGI("expectedDataSize=%u, v=%u", expectedDataSize, externalSize);
72 uint32_t maxDataSize = 1024u * 1024u;
73 if (itemCount * expectedDataSize >= maxDataSize) {
74 EXPECT_EQ(static_cast<uint32_t>(dataSize), maxDataSize);
75 return;
76 }
77 ASSERT_GE(static_cast<uint32_t>(dataSize), itemCount * expectedDataSize);
78 ASSERT_LE(static_cast<uint32_t>(dataSize), serialHeadLen + itemCount * (expectedDataSize + externalSize));
79 }
80
81 class DistributedDBSingleVerP2PSimpleSyncTest : public testing::Test {
82 public:
83 static void SetUpTestCase(void);
84 static void TearDownTestCase(void);
85 void SetUp();
86 void TearDown();
87 };
88
SetUpTestCase(void)89 void DistributedDBSingleVerP2PSimpleSyncTest::SetUpTestCase(void)
90 {
91 /**
92 * @tc.setup: Init datadir and Virtual Communicator.
93 */
94 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
95 g_config.dataDir = g_testDir;
96 g_mgr.SetKvStoreConfig(g_config);
97
98 string dir = g_testDir + "/single_ver";
99 DIR* dirTmp = opendir(dir.c_str());
100 if (dirTmp == nullptr) {
101 OS::MakeDBDirectory(dir);
102 } else {
103 closedir(dirTmp);
104 }
105
106 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
107 ASSERT_TRUE(g_communicatorAggregator != nullptr);
108 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
109 }
110
TearDownTestCase(void)111 void DistributedDBSingleVerP2PSimpleSyncTest::TearDownTestCase(void)
112 {
113 /**
114 * @tc.teardown: Release virtual Communicator and clear data dir.
115 */
116 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
117 LOGE("rm test db files error!");
118 }
119 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
120 }
121
SetUp(void)122 void DistributedDBSingleVerP2PSimpleSyncTest::SetUp(void)
123 {
124 DistributedDBToolsUnitTest::PrintTestCaseInfo();
125 /**
126 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
127 */
128 KvStoreNbDelegate::Option option;
129 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
130 ASSERT_TRUE(g_kvDelegateStatus == OK);
131 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
132 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
133 ASSERT_TRUE(g_deviceB != nullptr);
134 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
135 ASSERT_TRUE(syncInterfaceB != nullptr);
136 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
137
138 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
139 ASSERT_TRUE(g_deviceC != nullptr);
140 VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
141 ASSERT_TRUE(syncInterfaceC != nullptr);
142 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
143
144 auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
145 const std::string &deviceId, uint8_t flag) -> bool {
146 return true;
147 };
148 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
149 }
150
TearDown(void)151 void DistributedDBSingleVerP2PSimpleSyncTest::TearDown(void)
152 {
153 /**
154 * @tc.teardown: Release device A, B, C
155 */
156 if (g_kvDelegatePtr != nullptr) {
157 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
158 g_kvDelegatePtr = nullptr;
159 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
160 LOGD("delete kv store status %d", status);
161 ASSERT_TRUE(status == OK);
162 }
163 if (g_deviceB != nullptr) {
164 delete g_deviceB;
165 g_deviceB = nullptr;
166 }
167 if (g_deviceC != nullptr) {
168 delete g_deviceC;
169 g_deviceC = nullptr;
170 }
171 PermissionCheckCallbackV2 nullCallback;
172 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
173 }
174
CheckWatermark(const std::string & dev,KvStoreNbDelegate * kvDelegatePtr,WatermarkInfo expectInfo,bool sendEqual=true,bool receiveEqual=true)175 void CheckWatermark(const std::string &dev, KvStoreNbDelegate *kvDelegatePtr, WatermarkInfo expectInfo,
176 bool sendEqual = true, bool receiveEqual = true)
177 {
178 auto [status, watermarkInfo] = kvDelegatePtr->GetWatermarkInfo(dev);
179 EXPECT_EQ(status, OK);
180 if (sendEqual) {
181 EXPECT_EQ(watermarkInfo.sendMark, expectInfo.sendMark);
182 } else {
183 EXPECT_NE(watermarkInfo.sendMark, expectInfo.sendMark);
184 }
185 if (receiveEqual) {
186 EXPECT_EQ(watermarkInfo.receiveMark, expectInfo.receiveMark);
187 } else {
188 EXPECT_NE(watermarkInfo.receiveMark, expectInfo.receiveMark);
189 }
190 }
191
192 /**
193 * @tc.name: Normal Sync 001
194 * @tc.desc: Test normal push sync for add data.
195 * @tc.type: FUNC
196 * @tc.require: AR000CQS3S SR000CQE0B
197 * @tc.author: xushaohua
198 */
199 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync001, TestSize.Level1)
200 {
201 DBStatus status = OK;
202 std::vector<std::string> devices;
203 devices.push_back(g_deviceB->GetDeviceId());
204 devices.push_back(g_deviceC->GetDeviceId());
205
206 /**
207 * @tc.steps: step1. deviceA put {k1, v1}
208 */
209 Key key = {'1'};
210 Value value = {'1'};
211
212 WatermarkInfo info;
213 CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info);
214 CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info);
215 status = g_kvDelegatePtr->Put(key, value);
216 ASSERT_TRUE(status == OK);
217
218 /**
219 * @tc.steps: step2. deviceA call sync and wait
220 * @tc.expected: step2. sync should return OK.
221 */
222 std::map<std::string, DBStatus> result;
223 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
224 ASSERT_TRUE(status == OK);
225 CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info, false);
226 CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info, false);
227
228 /**
229 * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
230 */
231 ASSERT_TRUE(result.size() == devices.size());
232 for (const auto &pair : result) {
233 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
234 EXPECT_TRUE(pair.second == OK);
235 }
236 VirtualDataItem item;
237 g_deviceB->GetData(key, item);
238 EXPECT_TRUE(item.value == value);
239 g_deviceC->GetData(key, item);
240 EXPECT_TRUE(item.value == value);
241 }
242
243 /**
244 * @tc.name: Normal Sync 002
245 * @tc.desc: Test normal push sync for update data.
246 * @tc.type: FUNC
247 * @tc.require: AR000CCPOM
248 * @tc.author: xushaohua
249 */
250 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync002, TestSize.Level1)
251 {
252 DBStatus status = OK;
253 std::vector<std::string> devices;
254 devices.push_back(g_deviceB->GetDeviceId());
255 devices.push_back(g_deviceC->GetDeviceId());
256
257 /**
258 * @tc.steps: step1. deviceA put {k1, v1}
259 */
260 Key key = {'1'};
261 Value value = {'1'};
262 status = g_kvDelegatePtr->Put(key, value);
263 ASSERT_TRUE(status == OK);
264
265 /**
266 * @tc.steps: step2. deviceA put {k1, v2}
267 */
268 Value value2;
269 value2.push_back('2');
270 status = g_kvDelegatePtr->Put(key, value2);
271 ASSERT_TRUE(status == OK);
272
273 /**
274 * @tc.steps: step3. deviceA call sync and wait
275 * @tc.expected: step3. sync should return OK.
276 */
277 std::map<std::string, DBStatus> result;
278 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
279 ASSERT_TRUE(status == OK);
280
281 /**
282 * @tc.expected: step3. onComplete should be called, DeviceB,C have {k1,v2}
283 */
284 ASSERT_TRUE(result.size() == devices.size());
285 for (const auto &pair : result) {
286 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
287 EXPECT_TRUE(pair.second == OK);
288 }
289 VirtualDataItem item;
290 g_deviceC->GetData(key, item);
291 EXPECT_TRUE(item.value == value2);
292 g_deviceB->GetData(key, item);
293 EXPECT_TRUE(item.value == value2);
294 }
295
296 /**
297 * @tc.name: Normal Sync 003
298 * @tc.desc: Test normal push sync for delete data.
299 * @tc.type: FUNC
300 * @tc.require: AR000CQS3S
301 * @tc.author: xushaohua
302 */
303 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync003, TestSize.Level1)
304 {
305 DBStatus status = OK;
306 std::vector<std::string> devices;
307 devices.push_back(g_deviceB->GetDeviceId());
308 devices.push_back(g_deviceC->GetDeviceId());
309
310 /**
311 * @tc.steps: step1. deviceA put {k1, v1}
312 */
313 Key key = {'1'};
314 Value value = {'1'};
315 status = g_kvDelegatePtr->Put(key, value);
316 ASSERT_TRUE(status == OK);
317
318 /**
319 * @tc.steps: step2. deviceA delete k1
320 */
321 status = g_kvDelegatePtr->Delete(key);
322 ASSERT_TRUE(status == OK);
323 std::map<std::string, DBStatus> result;
324 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
325 ASSERT_TRUE(status == OK);
326
327 /**
328 * @tc.steps: step3. deviceA call sync and wait
329 * @tc.expected: step3. sync should return OK.
330 */
331 ASSERT_TRUE(result.size() == devices.size());
332 for (const auto &pair : result) {
333 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
334 EXPECT_TRUE(pair.second == OK);
335 }
336
337 /**
338 * @tc.expected: step3. onComplete should be called, DeviceB,C have {k1, delete}
339 */
340 VirtualDataItem item;
341 Key hashKey;
342 DistributedDBToolsUnitTest::CalcHash(key, hashKey);
343 EXPECT_EQ(g_deviceB->GetData(hashKey, item), -E_NOT_FOUND);
344 EXPECT_EQ(g_deviceC->GetData(hashKey, item), -E_NOT_FOUND);
345 }
346
347 /**
348 * @tc.name: Normal Sync 004
349 * @tc.desc: Test normal pull sync for add data.
350 * @tc.type: FUNC
351 * @tc.require: AR000CCPOM
352 * @tc.author: xushaohua
353 */
354 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync004, TestSize.Level1)
355 {
356 DBStatus status = OK;
357 std::vector<std::string> devices;
358 devices.push_back(g_deviceB->GetDeviceId());
359 devices.push_back(g_deviceC->GetDeviceId());
360
361 WatermarkInfo info;
362 CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info);
363 CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info);
364 /**
365 * @tc.steps: step1. deviceB put {k1, v1}
366 */
367 Key key = {'1'};
368 Value value = {'1'};
369 g_deviceB->PutData(key, value, 0, 0);
370
371 /**
372 * @tc.steps: step2. deviceB put {k2, v2}
373 */
374 Key key2 = {'2'};
375 Value value2 = {'2'};
376 g_deviceC->PutData(key2, value2, 0, 0);
377 ASSERT_TRUE(status == OK);
378
379 /**
380 * @tc.steps: step3. deviceA call pull sync
381 * @tc.expected: step3. sync should return OK.
382 */
383 std::map<std::string, DBStatus> result;
384 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
385 ASSERT_TRUE(status == OK);
386 CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info, true, false);
387 CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info, true, false);
388
389 /**
390 * @tc.expected: step3. onComplete should be called, DeviceA have {k1, VALUE_1}, {K2. VALUE_2}
391 */
392 ASSERT_TRUE(result.size() == devices.size());
393 for (const auto &pair : result) {
394 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
395 EXPECT_TRUE(pair.second == OK);
396 }
397 Value value3;
398 EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
399 EXPECT_EQ(value3, value);
400 EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
401 EXPECT_EQ(value3, value2);
402 }
403
404 /**
405 * @tc.name: Normal Sync 005
406 * @tc.desc: Test normal pull sync for update data.
407 * @tc.type: FUNC
408 * @tc.require: AR000CCPOM SR000CQE10
409 * @tc.author: xushaohua
410 */
411 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync005, TestSize.Level2)
412 {
413 DBStatus status = OK;
414 std::vector<std::string> devices;
415 devices.push_back(g_deviceB->GetDeviceId());
416 devices.push_back(g_deviceC->GetDeviceId());
417
418 /**
419 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
420 */
421 Key key1 = {'1'};
422 Value value1 = {'1'};
423 status = g_kvDelegatePtr->Put(key1, value1);
424 ASSERT_TRUE(status == OK);
425 Key key2 = {'2'};
426 Value value2 = {'2'};
427 status = g_kvDelegatePtr->Put(key2, value2);
428 ASSERT_TRUE(status == OK);
429
430 /**
431 * @tc.steps: step2. deviceB put {k1, v3} t2, t2 > t1
432 */
433 Value value3;
434 value3.push_back('3');
435 g_deviceB->PutData(key1, value3,
436 TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 0);
437
438 /**
439 * @tc.steps: step3. deviceC put {k2, v4} t2, t4 < t1
440 */
441 Value value4;
442 value4.push_back('4');
443 g_deviceC->PutData(key2, value4,
444 TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
445
446 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
447 /**
448 * @tc.steps: step4. deviceA call pull sync
449 * @tc.expected: step4. sync should return OK.
450 */
451 std::map<std::string, DBStatus> result;
452 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
453 ASSERT_TRUE(status == OK);
454
455 /**
456 * @tc.expected: step4. onComplete should be called, DeviceA have {k1, v3}, {k2. v2}
457 */
458 ASSERT_TRUE(result.size() == devices.size());
459 for (const auto &pair : result) {
460 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
461 EXPECT_TRUE(pair.second == OK);
462 }
463
464 Value value5;
465 g_kvDelegatePtr->Get(key1, value5);
466 EXPECT_TRUE(value5 == value3);
467 g_kvDelegatePtr->Get(key2, value5);
468 EXPECT_TRUE(value5 == value2);
469 }
470
471 /**
472 * @tc.name: Normal Sync 006
473 * @tc.desc: Test normal pull sync for delete data.
474 * @tc.type: FUNC
475 * @tc.require: AR000CQS3S
476 * @tc.author: xushaohua
477 */
478 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync006, TestSize.Level2)
479 {
480 /**
481 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
482 */
483 Key key1 = {'1'};
484 Value value1 = {'1'};
485 DBStatus status = g_kvDelegatePtr->Put(key1, value1);
486 ASSERT_TRUE(status == OK);
487 Key key2 = {'2'};
488 Value value2 = {'2'};
489 status = g_kvDelegatePtr->Put(key2, value2);
490 ASSERT_TRUE(status == OK);
491
492 /**
493 * @tc.steps: step2. deviceA put {k1, delete} t2, t2 <t1
494 */
495 Key hashKey1;
496 DistributedDBToolsUnitTest::CalcHash(key1, hashKey1);
497 g_deviceB->PutData(hashKey1, value1,
498 TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 1);
499
500 /**
501 * @tc.steps: step3. deviceA put {k1, delete} t3, t3 < t1
502 */
503 Key hashKey2;
504 DistributedDBToolsUnitTest::CalcHash(key2, hashKey2);
505 g_deviceC->PutData(hashKey2, value1,
506 TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
507
508 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
509 /**
510 * @tc.steps: step4. deviceA call pull sync
511 * @tc.expected: step4. sync should return OK.
512 */
513 std::map<std::string, DBStatus> result;
514 std::vector<std::string> devices;
515 devices.push_back(g_deviceB->GetDeviceId());
516 devices.push_back(g_deviceC->GetDeviceId());
517 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
518 ASSERT_TRUE(status == OK);
519
520 /**
521 * @tc.expected: step4. onComplete should be called, DeviceA have {k2. v2} don't have k1
522 */
523 ASSERT_TRUE(result.size() == devices.size());
524 for (const auto &pair : result) {
525 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
526 EXPECT_TRUE(pair.second == OK);
527 }
528 Value value5;
529 g_kvDelegatePtr->Get(key1, value5);
530 EXPECT_TRUE(value5.empty());
531 g_kvDelegatePtr->Get(key2, value5);
532 EXPECT_TRUE(value5 == value2);
533 }
534
535 /**
536 * @tc.name: Normal Sync 007
537 * @tc.desc: Test normal push_pull sync for add data.
538 * @tc.type: FUNC
539 * @tc.require: AR000CCPOM
540 * @tc.author: xushaohua
541 */
542 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync007, TestSize.Level1)
543 {
544 DBStatus status = OK;
545 std::vector<std::string> devices;
546 devices.push_back(g_deviceB->GetDeviceId());
547 devices.push_back(g_deviceC->GetDeviceId());
548
549 /**
550 * @tc.steps: step1. deviceA put {k1, v1}
551 */
552 Key key1 = {'1'};
553 Value value1 = {'1'};
554 status = g_kvDelegatePtr->Put(key1, value1);
555 EXPECT_TRUE(status == OK);
556
557 /**
558 * @tc.steps: step1. deviceB put {k2, v2}
559 */
560 Key key2 = {'2'};
561 Value value2 = {'2'};
562 g_deviceB->PutData(key2, value2, 0, 0);
563
564 /**
565 * @tc.steps: step1. deviceB put {k3, v3}
566 */
567 Key key3 = {'3'};
568 Value value3 = {'3'};
569 g_deviceC->PutData(key3, value3, 0, 0);
570
571 /**
572 * @tc.steps: step4. deviceA call push_pull sync
573 * @tc.expected: step4. sync should return OK.
574 */
575 std::map<std::string, DBStatus> result;
576 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
577 ASSERT_TRUE(status == OK);
578
579 ASSERT_TRUE(result.size() == devices.size());
580 for (const auto &pair : result) {
581 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
582 EXPECT_TRUE(pair.second == OK);
583 }
584
585 /**
586 * @tc.expected: step4. onComplete should be called, DeviceA have {k1. v1}, {k2, v2}, {k3, v3}
587 * deviceB received {k1. v1}, don't received k3, deviceC received {k1. v1}, don't received k2
588 */
589 Value value4;
590 g_kvDelegatePtr->Get(key2, value4);
591 EXPECT_TRUE(value4 == value2);
592 g_kvDelegatePtr->Get(key3, value4);
593 EXPECT_TRUE(value4 == value3);
594
595 VirtualDataItem item1;
596 g_deviceB->GetData(key1, item1);
597 EXPECT_TRUE(item1.value == value1);
598 item1.value.clear();
599 g_deviceB->GetData(key3, item1);
600 EXPECT_TRUE(item1.value.empty());
601
602 VirtualDataItem item2;
603 g_deviceC->GetData(key1, item2);
604 EXPECT_TRUE(item2.value == value1);
605 item2.value.clear();
606 g_deviceC->GetData(key2, item2);
607 EXPECT_TRUE(item2.value.empty());
608 }
609
610 /**
611 * @tc.name: Normal Sync 008
612 * @tc.desc: Test normal push_pull sync for update data.
613 * @tc.type: FUNC
614 * @tc.require: AR000CCPOM
615 * @tc.author: xushaohua
616 */
617 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync008, TestSize.Level2)
618 {
619 DBStatus status = OK;
620 std::vector<std::string> devices;
621 devices.push_back(g_deviceB->GetDeviceId());
622 devices.push_back(g_deviceC->GetDeviceId());
623
624 /**
625 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
626 */
627 Key key1 = {'1'};
628 Value value1 = {'1'};
629 status = g_kvDelegatePtr->Put(key1, value1);
630 ASSERT_TRUE(status == OK);
631
632 Key key2 = {'2'};
633 Value value2 = {'2'};
634 status = g_kvDelegatePtr->Put(key2, value2);
635 ASSERT_TRUE(status == OK);
636
637 /**
638 * @tc.steps: step2. deviceB put {k1, v3} t2, t2 > t1
639 */
640 Value value3 = {'3'};
641 g_deviceB->PutData(key1, value3,
642 TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 0);
643
644 /**
645 * @tc.steps: step3. deviceB put {k1, v4} t3, t4 <t1
646 */
647 Value value4 = {'4'};
648 g_deviceC->PutData(key2, value4,
649 TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
650 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
651
652 /**
653 * @tc.steps: step4. deviceA call push_pull sync
654 * @tc.expected: step4. sync should return OK.
655 */
656 std::map<std::string, DBStatus> result;
657 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
658 ASSERT_TRUE(status == OK);
659 ASSERT_TRUE(result.size() == devices.size());
660 for (const auto &pair : result) {
661 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
662 EXPECT_TRUE(pair.second == OK);
663 }
664
665 /**
666 * @tc.expected: step4. onComplete should be called, DeviceA have {k1. v3}, {k2, v2}
667 * deviceB have {k1. v3}, deviceC have {k2. v2}
668 */
669 Value value5;
670 g_kvDelegatePtr->Get(key1, value5);
671 EXPECT_EQ(value5, value3);
672 g_kvDelegatePtr->Get(key2, value5);
673 EXPECT_EQ(value5, value2);
674
675 VirtualDataItem item1;
676 g_deviceB->GetData(key1, item1);
677 EXPECT_TRUE(item1.value == value3);
678 item1.value.clear();
679 g_deviceB->GetData(key2, item1);
680 EXPECT_TRUE(item1.value == value2);
681
682 VirtualDataItem item2;
683 g_deviceC->GetData(key2, item2);
684 EXPECT_TRUE(item2.value == value2);
685 }
686
687 /**
688 * @tc.name: Normal Sync 009
689 * @tc.desc: Test normal push_pull sync for delete data.
690 * @tc.type: FUNC
691 * @tc.require: AR000CCPOM
692 * @tc.author: xushaohua
693 */
694 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync009, TestSize.Level2)
695 {
696 DBStatus status = OK;
697 std::vector<std::string> devices;
698 devices.push_back(g_deviceB->GetDeviceId());
699 devices.push_back(g_deviceC->GetDeviceId());
700
701 /**
702 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
703 */
704 Key key1 = {'1'};
705 Value value1 = {'1'};
706 status = g_kvDelegatePtr->Put(key1, value1);
707 ASSERT_TRUE(status == OK);
708
709 Key key2 = {'2'};
710 Value value2 = {'2'};
711 status = g_kvDelegatePtr->Put(key2, value2);
712 ASSERT_TRUE(status == OK);
713
714 /**
715 * @tc.steps: step2. deviceB put {k1, delete} t2, t2 > t1
716 */
717 Key hashKey1;
718 DistributedDBToolsUnitTest::CalcHash(key1, hashKey1);
719 g_deviceB->PutData(hashKey1, value1,
720 TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 1);
721
722 /**
723 * @tc.steps: step3. deviceB put {k1, delete} t3, t2 < t1
724 */
725 Key hashKey2;
726 DistributedDBToolsUnitTest::CalcHash(key2, hashKey2);
727 g_deviceC->PutData(hashKey2, value2,
728 TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 1);
729
730 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
731 /**
732 * @tc.steps: step4. deviceA call push_pull sync
733 * @tc.expected: step4. sync should return OK.
734 */
735 std::map<std::string, DBStatus> result;
736 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
737 ASSERT_TRUE(status == OK);
738
739 /**
740 * @tc.expected: step4. onComplete should be called, DeviceA have {k1. delete}, {k2, v2}
741 * deviceB have {k2. v2}, deviceC have {k2. v2}
742 */
743 ASSERT_TRUE(result.size() == devices.size());
744 for (const auto &pair : result) {
745 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
746 EXPECT_TRUE(pair.second == OK);
747 }
748
749 Value value3;
750 g_kvDelegatePtr->Get(key1, value3);
751 EXPECT_TRUE(value3.empty());
752 value3.clear();
753 g_kvDelegatePtr->Get(key2, value3);
754 EXPECT_TRUE(value3 == value2);
755
756 VirtualDataItem item1;
757 g_deviceB->GetData(key2, item1);
758 EXPECT_TRUE(item1.value == value2);
759
760 VirtualDataItem item2;
761 g_deviceC->GetData(key2, item2);
762 EXPECT_TRUE(item2.value == value2);
763 }
764
765 /**
766 * @tc.name: Normal Sync 010
767 * @tc.desc: Test sync failed by invalid devices.
768 * @tc.type: FUNC
769 * @tc.require: AR000CCPOM
770 * @tc.author: zhangqiquan
771 */
772 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync010, TestSize.Level1)
773 {
774 DBStatus status = OK;
775 std::vector<std::string> devices;
776 std::string invalidDev = std::string(DBConstant::MAX_DEV_LENGTH + 1, '0');
777 devices.push_back(DEVICE_A);
778 devices.push_back(g_deviceB->GetDeviceId());
779 devices.push_back(invalidDev);
780
781 std::map<std::string, DBStatus> result;
782 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
783 ASSERT_TRUE(status == OK);
784
785 ASSERT_EQ(result.size(), devices.size());
786 EXPECT_EQ(result[DEVICE_A], INVALID_ARGS);
787 EXPECT_EQ(result[invalidDev], INVALID_ARGS);
788 EXPECT_EQ(result[DEVICE_B], OK);
789 }
790
791 /**
792 * @tc.name: Normal Sync 011
793 * @tc.desc: Test sync with translated id.
794 * @tc.type: FUNC
795 * @tc.require:
796 * @tc.author: zhangqiquan
797 */
798 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync011, TestSize.Level0)
799 {
800 DBStatus status = OK;
801 std::vector<std::string> devices;
802 devices.push_back(g_deviceB->GetDeviceId());
803
804 /**
805 * @tc.steps: step1. deviceA put {k1, v1}
806 */
807 Key key = {'1'};
808 Value value = {'1'};
809 status = g_kvDelegatePtr->Put(key, value);
810 ASSERT_TRUE(status == OK);
811
812 /**
813 * @tc.steps: step2. ori dev will be append test
814 * @tc.expected: step2. sync should return OK.
815 */
__anone3be83740302(const std::string &oriDevId, const StoreInfo &) 816 RuntimeConfig::SetTranslateToDeviceIdCallback([](const std::string &oriDevId, const StoreInfo &) {
817 std::string dev = oriDevId + "test";
818 LOGI("translate %s to %s", oriDevId.c_str(), dev.c_str());
819 return dev;
820 });
821 std::map<std::string, DBStatus> result;
822 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
823 ASSERT_TRUE(status == OK);
824 RuntimeConfig::SetTranslateToDeviceIdCallback(nullptr);
825
826 /**
827 * @tc.expected: step2. onComplete should be called, Send watermark should not be zero
828 */
829 ASSERT_TRUE(result.size() == devices.size());
830 for (const auto &pair : result) {
831 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
832 EXPECT_TRUE(pair.second == OK);
833 }
834 WatermarkInfo info;
835 CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info, false);
836 info = {};
837 std::string checkDev = g_deviceB->GetDeviceId() + "test";
838 CheckWatermark(checkDev, g_kvDelegatePtr, info, false);
839 }
840
841 /**
842 * @tc.name: Limit Data Sync 001
843 * @tc.desc: Test sync limit key and value data
844 * @tc.type: FUNC
845 * @tc.require: AR000CCPOM
846 * @tc.author: xushaohua
847 */
848 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, LimitDataSync001, TestSize.Level1)
849 {
850 DBStatus status = OK;
851 std::vector<std::string> devices;
852 devices.push_back(g_deviceB->GetDeviceId());
853
854 Key key1;
855 Value value1;
856 DistributedDBToolsUnitTest::GetRandomKeyValue(key1, DBConstant::MAX_KEY_SIZE + 1);
857 DistributedDBToolsUnitTest::GetRandomKeyValue(value1, DBConstant::MAX_VALUE_SIZE + 1);
858
859 Key key2;
860 Value value2;
861 DistributedDBToolsUnitTest::GetRandomKeyValue(key2, DBConstant::MAX_KEY_SIZE);
862 DistributedDBToolsUnitTest::GetRandomKeyValue(value2, DBConstant::MAX_VALUE_SIZE);
863
864 /**
865 * @tc.steps: step1. deviceB put {k1, v1}, K1 > 1k, v1 > 4M
866 */
867 g_deviceB->PutData(key1, value1, 0, 0);
868
869 /**
870 * @tc.steps: step2. deviceB put {k2, v2}, K2 = 1k, v2 = 4M
871 */
872 g_deviceC->PutData(key2, value2, 0, 0);
873
874 /**
875 * @tc.steps: step3. deviceA call pull sync from device B
876 * @tc.expected: step3. sync should return OK.
877 */
878 std::map<std::string, DBStatus> result;
879 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
880 ASSERT_TRUE(status == OK);
881
882 /**
883 * @tc.expected: step3. onComplete should be called.
884 */
885 ASSERT_TRUE(result.size() == devices.size());
886 for (const auto &pair : result) {
887 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
888 if (pair.first == g_deviceB->GetDeviceId()) {
889 EXPECT_TRUE(pair.second != OK);
890 } else {
891 EXPECT_TRUE(pair.second == OK);
892 }
893 }
894
895 /**
896 * @tc.steps: step4. deviceA call pull sync from deviceC
897 * @tc.expected: step4. sync should return OK.
898 */
899 devices.clear();
900 result.clear();
901 devices.push_back(g_deviceC->GetDeviceId());
902 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
903 ASSERT_TRUE(status == OK);
904
905 /**
906 * @tc.expected: step4. onComplete should be called, DeviceA have {k2. v2}, don't have {k1, v1}
907 */
908 ASSERT_TRUE(result.size() == devices.size());
909 for (const auto &pair : result) {
910 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
911 EXPECT_TRUE(pair.second == OK);
912 }
913
914 // Get value from A
915 Value valueRead;
916 EXPECT_TRUE(g_kvDelegatePtr->Get(key1, valueRead) != OK);
917 valueRead.clear();
918 EXPECT_EQ(g_kvDelegatePtr->Get(key2, valueRead), OK);
919 EXPECT_TRUE(valueRead == value2);
920 }
921
922 /**
923 * @tc.name: Limit Data Sync 002
924 * @tc.desc: Test PutBatch with invalid entries and then call sync.
925 * @tc.type: FUNC
926 * @tc.require: DTS2024012914038
927 * @tc.author: mazhao
928 */
929 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, LimitDataSync002, TestSize.Level1)
930 {
931 DBStatus status = OK;
932 std::vector<std::string> devices;
933 devices.push_back(g_deviceB->GetDeviceId());
934 devices.push_back(g_deviceC->GetDeviceId());
935 Key legalKey;
936 DistributedDBToolsUnitTest::GetRandomKeyValue(legalKey, DBConstant::MAX_KEY_SIZE); // 1K
937 Value legalValue;
938 DistributedDBToolsUnitTest::GetRandomKeyValue(legalValue, DBConstant::MAX_VALUE_SIZE); // 4M
939 Value emptyValue; // 0k
940 vector<Entry> illegalEntrys; // size is 512M + 1KB
941 for (int i = 0; i < 127; i++) { // 127 * (legalValue + legalKey) is equal to 508M + 127KB < 512M.
942 illegalEntrys.push_back({legalKey, legalValue});
943 }
944 for (int i = 0; i < 3970; i++) { // 3970 * legalKey is equal to 3970KB.
945 illegalEntrys.push_back({legalKey, emptyValue});
946 }
947 /**
948 * @tc.steps: step1. PutBatch with invalid entries inside which total length of the key and valud is more than 512M
949 * @tc.expected: step1. PutBatch should return INVALID_ARGS.
950 */
951 EXPECT_EQ(g_kvDelegatePtr->PutBatch(illegalEntrys), INVALID_ARGS);
952 /**
953 * @tc.steps: step2. deviceA call push_pull sync
954 * @tc.expected: step2. sync return OK and all statuses is OK.
955 */
956 std::map<std::string, DBStatus> result;
957 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
958 ASSERT_TRUE(status == OK);
959 ASSERT_TRUE(result.size() == devices.size());
960 for (const auto &pair : result) {
961 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
962 printf("dev %s, status %d", pair.first.c_str(), pair.second);
963 EXPECT_TRUE(pair.second == OK);
964 }
965 }
966
967 /**
968 * @tc.name: Auto Sync 001
969 * @tc.desc: Verify auto sync enable function.
970 * @tc.type: FUNC
971 * @tc.require: AR000CKRTD AR000CQE0E
972 * @tc.author: xushaohua
973 */
974 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, AutoSync001, TestSize.Level1)
975 {
976 std::vector<std::string> devices;
977 devices.push_back(g_deviceB->GetDeviceId());
978 devices.push_back(g_deviceC->GetDeviceId());
979
980 /**
981 * @tc.steps: step1. enable auto sync
982 * @tc.expected: step1, Pragma return OK.
983 */
984 bool autoSync = true;
985 PragmaData data = static_cast<PragmaData>(&autoSync);
986 DBStatus status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
987 ASSERT_EQ(status, OK);
988
989 /**
990 * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
991 */
992 ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
993 ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_2, VALUE_2) == OK);
994
995 /**
996 * @tc.steps: step3. sleep for data sync
997 * @tc.expected: step3. deviceB,C has {k1, v1}, {k2, v2}
998 */
999 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1000 VirtualDataItem item;
1001 g_deviceB->GetData(KEY_1, item);
1002 EXPECT_EQ(item.value, VALUE_1);
1003 g_deviceB->GetData(KEY_2, item);
1004 EXPECT_EQ(item.value, VALUE_2);
1005 g_deviceC->GetData(KEY_1, item);
1006 EXPECT_EQ(item.value, VALUE_1);
1007 g_deviceC->GetData(KEY_2, item);
1008 EXPECT_EQ(item.value, VALUE_2);
1009 }
1010
1011 /**
1012 * @tc.name: Auto Sync 002
1013 * @tc.desc: Verify auto sync disable function.
1014 * @tc.type: FUNC
1015 * @tc.require: AR000CKRTD AR000CQE0E
1016 * @tc.author: xushaohua
1017 */
1018 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, AutoSync002, TestSize.Level1)
1019 {
1020 std::vector<std::string> devices;
1021 devices.push_back(g_deviceB->GetDeviceId());
1022 devices.push_back(g_deviceC->GetDeviceId());
1023
1024 /**
1025 * @tc.steps: step1. disable auto sync
1026 * @tc.expected: step1, Pragma return OK.
1027 */
1028 bool autoSync = false;
1029 PragmaData data = static_cast<PragmaData>(&autoSync);
1030 DBStatus status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1031 ASSERT_EQ(status, OK);
1032
1033 /**
1034 * @tc.steps: step2. deviceB put {k1, v1}, deviceC put {k2, v2}
1035 */
1036 g_deviceB->PutData(KEY_1, VALUE_1, 0, 0);
1037 g_deviceC->PutData(KEY_2, VALUE_2, 0, 0);
1038
1039 /**
1040 * @tc.steps: step3. sleep for data sync
1041 * @tc.expected: step3. deviceA don't have k1, k2.
1042 */
1043 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1044 Value value3;
1045 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == NOT_FOUND);
1046 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == NOT_FOUND);
1047 }
1048
1049 /**
1050 * @tc.name: Block Sync 001
1051 * @tc.desc: Verify block push sync function.
1052 * @tc.type: FUNC
1053 * @tc.require: AR000CKRTD AR000CQE0E
1054 * @tc.author: xushaohua
1055 */
1056 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync001, TestSize.Level1)
1057 {
1058 std::vector<std::string> devices;
1059 devices.push_back(g_deviceB->GetDeviceId());
1060 devices.push_back(g_deviceC->GetDeviceId());
1061
1062 /**
1063 * @tc.steps: step1. deviceA put {k1, v1}
1064 */
1065 g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1066
1067 /**
1068 * @tc.steps: step2. deviceA call block push sync to deviceB & deviceC.
1069 * @tc.expected: step2. Sync return OK, devices status OK, deviceB & deivceC has {k1, v1}.
1070 */
1071 std::map<std::string, DBStatus> result;
1072 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1073 ASSERT_EQ(status, OK);
1074 ASSERT_TRUE(result.size() == devices.size());
1075 for (const auto &pair : result) {
1076 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1077 EXPECT_TRUE(pair.second == OK);
1078 }
1079 VirtualDataItem item1;
1080 EXPECT_EQ(g_deviceB->GetData(KEY_1, item1), E_OK);
1081 EXPECT_EQ(item1.value, VALUE_1);
1082 VirtualDataItem item2;
1083 EXPECT_EQ(g_deviceC->GetData(KEY_1, item2), E_OK);
1084 EXPECT_EQ(item2.value, VALUE_1);
1085 }
1086
1087 /**
1088 * @tc.name: Block Sync 002
1089 * @tc.desc: Verify block pull sync function.
1090 * @tc.type: FUNC
1091 * @tc.require: AR000CKRTD AR000CQE0E
1092 * @tc.author: xushaohua
1093 */
1094 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync002, TestSize.Level1)
1095 {
1096 std::vector<std::string> devices;
1097 devices.push_back(g_deviceB->GetDeviceId());
1098 devices.push_back(g_deviceC->GetDeviceId());
1099
1100 /**
1101 * @tc.steps: step1. deviceB put {k1, v1}, deviceC put {k2, v2}
1102 */
1103 g_deviceB->PutData(KEY_1, VALUE_1, 0, 0);
1104 g_deviceC->PutData(KEY_2, VALUE_2, 0, 0);
1105
1106 /**
1107 * @tc.steps: step2. deviceA call block pull and pull sync to deviceB & deviceC.
1108 * @tc.expected: step2. Sync return OK, devices status OK, deviceA has {k1, v1}, {k2, v2}
1109 */
1110 std::map<std::string, DBStatus> result;
1111 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1112 ASSERT_EQ(status, OK);
1113 ASSERT_TRUE(result.size() == devices.size());
1114 for (const auto &pair : result) {
1115 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1116 EXPECT_TRUE(pair.second == OK);
1117 }
1118 Value value3;
1119 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == OK);
1120 EXPECT_TRUE(value3 == VALUE_1);
1121 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == OK);
1122 EXPECT_TRUE(value3 == VALUE_2);
1123 }
1124
1125 /**
1126 * @tc.name: Block Sync 003
1127 * @tc.desc: Verify block push and pull sync function.
1128 * @tc.type: FUNC
1129 * @tc.require: AR000CKRTD AR000CQE0E
1130 * @tc.author: xushaohua
1131 */
1132 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync003, TestSize.Level1)
1133 {
1134 std::vector<std::string> devices;
1135 devices.push_back(g_deviceB->GetDeviceId());
1136 devices.push_back(g_deviceC->GetDeviceId());
1137
1138 /**
1139 * @tc.steps: step1. deviceA put {k1, v1}
1140 */
1141 g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1142
1143 /**
1144 * @tc.steps: step2. deviceB put {k1, v1}, deviceB put {k2, v2}
1145 */
1146 g_deviceB->PutData(KEY_2, VALUE_2, 0, 0);
1147 g_deviceC->PutData(KEY_3, VALUE_3, 0, 0);
1148
1149 /**
1150 * @tc.steps: step3. deviceA call block pull and pull sync to deviceB & deviceC.
1151 * @tc.expected: step3. Sync return OK, devices status OK, deviceA has {k1, v1}, {k2, v2} {k3, v3}
1152 * deviceB has {k1, v1}, {k2. v2} , deviceC has {k1, v1}, {k3, v3}
1153 */
1154 std::map<std::string, DBStatus> result;
1155 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, true);
1156 ASSERT_EQ(status, OK);
1157 ASSERT_TRUE(result.size() == devices.size());
1158 for (const auto &pair : result) {
1159 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1160 EXPECT_TRUE(pair.second == OK);
1161 }
1162
1163 VirtualDataItem item1;
1164 g_deviceB->GetData(KEY_1, item1);
1165 EXPECT_TRUE(item1.value == VALUE_1);
1166 g_deviceB->GetData(KEY_2, item1);
1167 EXPECT_TRUE(item1.value == VALUE_2);
1168
1169 VirtualDataItem item2;
1170 g_deviceC->GetData(KEY_1, item2);
1171 EXPECT_TRUE(item2.value == VALUE_1);
1172 g_deviceC->GetData(KEY_3, item2);
1173 EXPECT_TRUE(item2.value == VALUE_3);
1174
1175 Value value3;
1176 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == OK);
1177 EXPECT_TRUE(value3 == VALUE_1);
1178 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == OK);
1179 EXPECT_TRUE(value3 == VALUE_2);
1180 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_3, value3) == OK);
1181 EXPECT_TRUE(value3 == VALUE_3);
1182 }
1183
1184 /**
1185 * @tc.name: Block Sync 004
1186 * @tc.desc: Verify block sync function invalid args.
1187 * @tc.type: FUNC
1188 * @tc.require: AR000CKRTD AR000CQE0E
1189 * @tc.author: xushaohua
1190 */
1191 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync004, TestSize.Level2)
1192 {
1193 std::vector<std::string> devices;
1194
1195 /**
1196 * @tc.steps: step1. deviceA put {k1, v1}
1197 */
1198 g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1199
1200 /**
1201 * @tc.steps: step2. deviceA call block push sync to deviceB & deviceC.
1202 * @tc.expected: step2. Sync return INVALID_ARGS
1203 */
1204 std::map<std::string, DBStatus> result;
1205 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1206 EXPECT_EQ(status, INVALID_ARGS);
1207
1208 /**
1209 * @tc.steps: step3. deviceB, deviceC offlinem and push deviceA sync to deviceB and deviceC.
1210 * @tc.expected: step3. Sync return OK, but the deviceB and deviceC are TIME_OUT
1211 */
1212 devices.push_back(g_deviceB->GetDeviceId());
1213 devices.push_back(g_deviceC->GetDeviceId());
1214 g_deviceB->Offline();
1215 g_deviceC->Offline();
1216
1217 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1218 EXPECT_EQ(status, OK);
1219 ASSERT_TRUE(result.size() == devices.size());
1220 for (const auto &pair : result) {
1221 // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
1222 // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
1223 // The returned status is COMM_FAILURE
1224 EXPECT_TRUE((pair.second == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
1225 (pair.second == COMM_FAILURE));
1226 }
1227 }
1228
1229 /**
1230 * @tc.name: Block Sync 005
1231 * @tc.desc: Verify block sync function busy.
1232 * @tc.type: FUNC
1233 * @tc.require: AR000CKRTD AR000CQE0E
1234 * @tc.author: xushaohua
1235 */
1236 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync005, TestSize.Level2)
1237 {
1238 std::vector<std::string> devices;
1239 devices.push_back(g_deviceB->GetDeviceId());
1240 devices.push_back(g_deviceC->GetDeviceId());
1241 /**
1242 * @tc.steps: step1. deviceA put {k1, v1}
1243 */
1244 g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1245
1246 /**
1247 * @tc.steps: step2. New a thread to deviceA call block push sync to deviceB & deviceC,
1248 * but deviceB & C is blocked
1249 * @tc.expected: step2. Sync will be blocked util timeout, and then return OK
1250 */
1251 g_deviceB->Offline();
1252 g_deviceC->Offline();
__anone3be83740402() 1253 thread thread([devices]() {
1254 std::map<std::string, DBStatus> resultInner;
1255 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1256 EXPECT_EQ(status, OK);
1257 });
1258 thread.detach();
1259 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1260 /**
1261 * @tc.steps: step3. sleep 1s and call sync.
1262 * @tc.expected: step3. Sync will return BUSY.
1263 */
1264 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1265 std::map<std::string, DBStatus> result;
1266 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, true);
1267 EXPECT_EQ(status, OK);
1268 }
1269
1270 /**
1271 * @tc.name: SyncQueue001
1272 * @tc.desc: Invalid args check of Pragma GET_QUEUED_SYNC_SIZE SET_QUEUED_SYNC_LIMIT and
1273 * GET_QUEUED_SYNC_LIMIT, expect return INVALID_ARGS.
1274 * @tc.type: FUNC
1275 * @tc.require: AR000D4876
1276 * @tc.author: wangchuanqing
1277 */
1278 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue001, TestSize.Level3)
1279 {
1280 /**
1281 * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE, and set param to be null
1282 * @tc.expected: step1. Expect return INVALID_ARGS.
1283 */
1284 int *param = nullptr;
1285 PragmaData input = static_cast<PragmaData>(param);
1286 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), INVALID_ARGS);
1287
1288 /**
1289 * @tc.steps:step2. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be null
1290 * @tc.expected: step2. Expect return INVALID_ARGS.
1291 */
1292 input = static_cast<PragmaData>(param);
1293 EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1294
1295 /**
1296 * @tc.steps:step3. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT, and set param to be null
1297 * @tc.expected: step3. Expect return INVALID_ARGS.
1298 */
1299 input = static_cast<PragmaData>(param);
1300 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1301
1302 /**
1303 * @tc.steps:step4. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be QUEUED_SYNC_LIMIT_MIN - 1
1304 * @tc.expected: step4. Expect return INVALID_ARGS.
1305 */
1306 int limit = DBConstant::QUEUED_SYNC_LIMIT_MIN - 1;
1307 input = static_cast<PragmaData>(&limit);
1308 EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1309
1310 /**
1311 * @tc.steps:step5. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be QUEUED_SYNC_LIMIT_MAX + 1
1312 * @tc.expected: step5. Expect return INVALID_ARGS.
1313 */
1314 limit = DBConstant::QUEUED_SYNC_LIMIT_MAX + 1;
1315 input = static_cast<PragmaData>(&limit);
1316 EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1317 }
1318
1319 /**
1320 * @tc.name: SyncQueue002
1321 * @tc.desc: Pragma GET_QUEUED_SYNC_LIMIT and SET_QUEUED_SYNC_LIMIT
1322 * @tc.type: FUNC
1323 * @tc.require: AR000D4876
1324 * @tc.author: wangchuanqing
1325 */
1326 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue002, TestSize.Level3)
1327 {
1328 /**
1329 * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT,
1330 * @tc.expected: step1. Expect return OK, limit eq QUEUED_SYNC_LIMIT_DEFAULT.
1331 */
1332 int limit = 0;
1333 PragmaData input = static_cast<PragmaData>(&limit);
1334 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), OK);
1335 EXPECT_EQ(limit, DBConstant::QUEUED_SYNC_LIMIT_DEFAULT);
1336
1337 /**
1338 * @tc.steps:step2. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be 50
1339 * @tc.expected: step2. Expect return OK.
1340 */
1341 limit = 50;
1342 input = static_cast<PragmaData>(&limit);
1343 EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), OK);
1344
1345 /**
1346 * @tc.steps:step3. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT,
1347 * @tc.expected: step3. Expect return OK, limit eq 50
1348 */
1349 limit = 0;
1350 input = static_cast<PragmaData>(&limit);
1351 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), OK);
1352 EXPECT_EQ(limit, 50);
1353 }
1354
1355 /**
1356 * @tc.name: SyncQueue003
1357 * @tc.desc: sync queue test
1358 * @tc.type: FUNC
1359 * @tc.require: AR000D4876
1360 * @tc.author: wangchuanqing
1361 */
1362 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue003, TestSize.Level3)
1363 {
1364 DBStatus status = OK;
1365 std::vector<std::string> devices;
1366 devices.push_back(g_deviceB->GetDeviceId());
1367 devices.push_back(g_deviceC->GetDeviceId());
1368
1369 /**
1370 * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1371 * @tc.expected: step1. Expect return OK, size eq 0.
1372 */
1373 int size;
1374 PragmaData input = static_cast<PragmaData>(&size);
1375 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1376 EXPECT_EQ(size, 0);
1377
1378 /**
1379 * @tc.steps:step2. deviceA put {k1, v1}
1380 */
1381 status = g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1382 ASSERT_TRUE(status == OK);
1383
1384 /**
1385 * @tc.steps:step3. deviceA sync SYNC_MODE_PUSH_ONLY
1386 */
1387 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1388 ASSERT_TRUE(status == OK);
1389
1390 /**
1391 * @tc.steps:step4. deviceA put {k2, v2}
1392 */
1393 status = g_kvDelegatePtr->Put(KEY_2, VALUE_2);
1394 ASSERT_TRUE(status == OK);
1395
1396 /**
1397 * @tc.steps:step5. deviceA sync SYNC_MODE_PUSH_ONLY
1398 */
1399 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1400 ASSERT_TRUE(status == OK);
1401
1402 /**
1403 * @tc.steps:step6. deviceB put {k3, v3}
1404 */
1405 g_deviceB->PutData(KEY_3, VALUE_3, 0, 0);
1406
1407 /**
1408 * @tc.steps:step7. deviceA put {k4, v4}
1409 */
1410 status = g_kvDelegatePtr->Put(KEY_4, VALUE_4);
1411 ASSERT_TRUE(status == OK);
1412
1413 /**
1414 * @tc.steps:step8. deviceA sync SYNC_MODE_PUSH_PULL
1415 */
1416 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_PULL, nullptr, false);
1417 ASSERT_TRUE(status == OK);
1418
1419 /**
1420 * @tc.steps:step9. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1421 * @tc.expected: step1. Expect return OK, 0 <= size <= 4
1422 */
1423 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1424 ASSERT_TRUE((size >= 0) && (size <= 4));
1425
1426 /**
1427 * @tc.steps:step10. deviceB put {k5, v5}
1428 */
1429 g_deviceB->PutData(KEY_5, VALUE_5, 0, 0);
1430
1431 /**
1432 * @tc.steps:step11. deviceA call sync and wait
1433 * @tc.expected: step11. sync should return OK.
1434 */
1435 std::map<std::string, DBStatus> result;
1436 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
1437 ASSERT_TRUE(status == OK);
1438
1439 /**
1440 * @tc.expected: step11. onComplete should be called, DeviceA,B,C have {k1,v1}~ {KEY_5,VALUE_5}
1441 */
1442 ASSERT_TRUE(result.size() == devices.size());
1443 for (const auto &pair : result) {
1444 EXPECT_TRUE(pair.second == OK);
1445 }
1446 VirtualDataItem item;
1447 g_deviceB->GetData(KEY_1, item);
1448 EXPECT_TRUE(item.value == VALUE_1);
1449 g_deviceB->GetData(KEY_2, item);
1450 EXPECT_TRUE(item.value == VALUE_2);
1451 g_deviceB->GetData(KEY_3, item);
1452 EXPECT_TRUE(item.value == VALUE_3);
1453 g_deviceB->GetData(KEY_4, item);
1454 EXPECT_TRUE(item.value == VALUE_4);
1455 g_deviceB->GetData(KEY_5, item);
1456 EXPECT_TRUE(item.value == VALUE_5);
1457 Value value;
1458 EXPECT_EQ(g_kvDelegatePtr->Get(KEY_3, value), OK);
1459 EXPECT_EQ(VALUE_3, value);
1460 EXPECT_EQ(g_kvDelegatePtr->Get(KEY_5, value), OK);
1461 EXPECT_EQ(VALUE_5, value);
1462 }
1463
1464 /**
1465 * @tc.name: SyncQueue004
1466 * @tc.desc: sync queue full test
1467 * @tc.type: FUNC
1468 * @tc.require: AR000D4876
1469 * @tc.author: wangchuanqing
1470 */
1471 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue004, TestSize.Level3)
1472 {
1473 DBStatus status = OK;
1474 std::vector<std::string> devices;
1475 devices.push_back(g_deviceB->GetDeviceId());
1476 devices.push_back(g_deviceC->GetDeviceId());
1477
1478 /**
1479 * @tc.steps:step1. deviceB C block
1480 */
1481 g_communicatorAggregator->SetBlockValue(true);
1482
1483 /**
1484 * @tc.steps:step2. deviceA put {k1, v1}
1485 */
1486 status = g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1487 ASSERT_TRUE(status == OK);
1488
1489 /**
1490 * @tc.steps:step3. deviceA sync QUEUED_SYNC_LIMIT_DEFAULT times
1491 * @tc.expected: step3. Expect return OK
1492 */
1493 for (int i = 0; i < DBConstant::QUEUED_SYNC_LIMIT_DEFAULT; i++) {
1494 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1495 ASSERT_TRUE(status == OK);
1496 }
1497
1498 /**
1499 * @tc.steps:step4. deviceA sync
1500 * @tc.expected: step4. Expect return BUSY
1501 */
1502 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1503 ASSERT_TRUE(status == BUSY);
1504 g_communicatorAggregator->SetBlockValue(false);
1505 }
1506
1507 /**
1508 * @tc.name: SyncQueue005
1509 * @tc.desc: block sync queue test
1510 * @tc.type: FUNC
1511 * @tc.require: AR000D4876
1512 * @tc.author: wangchuanqing
1513 */
1514 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue005, TestSize.Level3)
1515 {
1516 std::vector<std::string> devices;
1517 devices.push_back(g_deviceB->GetDeviceId());
1518 devices.push_back(g_deviceC->GetDeviceId());
1519 /**
1520 * @tc.steps:step1. New a thread to deviceA call block push sync to deviceB & deviceC,
1521 * but deviceB & C is offline
1522 * @tc.expected: step1. Sync will be blocked util timeout, and then return OK
1523 */
1524 g_deviceB->Offline();
1525 g_deviceC->Offline();
1526 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1527
1528 /**
1529 * @tc.steps:step2. deviceA put {k1, v1}
1530 */
1531 g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1532
1533 std::mutex lockMutex;
1534 std::condition_variable conditionVar;
1535
__anone3be83740502() 1536 std::thread threadFirst([devices]() {
1537 std::map<std::string, DBStatus> resultInner;
1538 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1539 EXPECT_EQ(status, OK);
1540 });
1541 threadFirst.detach();
1542 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1543 /**
1544 * @tc.steps:step3. New a thread to deviceA call block push sync to deviceB & deviceC,
1545 * but deviceB & C is offline
1546 * @tc.expected: step2. Sync will be blocked util timeout, and then return OK
1547 */
__anone3be83740602() 1548 std::thread threadSecond([devices, &lockMutex, &conditionVar]() {
1549 std::map<std::string, DBStatus> resultInner;
1550 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1551 EXPECT_EQ(status, OK);
1552 std::unique_lock<mutex> lockInner(lockMutex);
1553 conditionVar.notify_one();
1554 });
1555 threadSecond.detach();
1556
1557 /**
1558 * @tc.steps:step4. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1559 * @tc.expected: step1. Expect return OK, size eq 0.
1560 */
1561 int size;
1562 PragmaData input = static_cast<PragmaData>(&size);
1563 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1564 EXPECT_EQ(size, 0);
1565
1566 /**
1567 * @tc.steps:step5. wait exit
1568 */
1569 std::unique_lock<mutex> lock(lockMutex);
1570 auto now = std::chrono::system_clock::now();
1571 conditionVar.wait_until(lock, now + 2 * INT8_MAX * 1000ms);
1572 }
1573
1574 /**
1575 * @tc.name: CalculateSyncData001
1576 * @tc.desc: Test sync data whose device never synced before
1577 * @tc.type: FUNC
1578 * @tc.require: AR000HI2JS
1579 * @tc.author: zhuwentao
1580 */
1581 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData001, TestSize.Level3)
1582 {
1583 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1584 size_t dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
1585 uint32_t serialHeadLen = 8u;
1586 EXPECT_EQ(static_cast<uint32_t>(dataSize), 0u + serialHeadLen);
1587 uint32_t keySize = 256u;
1588 uint32_t valuesize = 1024u;
1589 uint32_t itemCount = 10u;
1590 CalculateDataTest(itemCount, keySize, valuesize);
1591 }
1592
1593 /**
1594 * @tc.name: CalculateSyncData002
1595 * @tc.desc: Test sync data whose device synced before, but sync data is less than 1M
1596 * @tc.type: FUNC
1597 * @tc.require: AR000HI2JS
1598 * @tc.author: zhuwentao
1599 */
1600 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData002, TestSize.Level3)
1601 {
1602 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1603 Key key1 = {'1'};
1604 Value value1 = {'1'};
1605 EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
1606
1607 std::vector<std::string> devices;
1608 devices.push_back(g_deviceB->GetDeviceId());
1609 std::map<std::string, DBStatus> result;
1610 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1611 ASSERT_TRUE(status == OK);
1612 ASSERT_TRUE(result.size() == devices.size());
1613 for (const auto &pair : result) {
1614 EXPECT_TRUE(pair.second == OK);
1615 }
1616
1617 uint32_t keySize = 256u;
1618 uint32_t valuesize = 512u;
1619 uint32_t itemCount = 20u;
1620 CalculateDataTest(itemCount, keySize, valuesize);
1621 }
1622
1623 /**
1624 * @tc.name: CalculateSyncData003
1625 * @tc.desc: Test sync data whose device synced before, but sync data is larger than 1M
1626 * @tc.type: FUNC
1627 * @tc.require: AR000HI2JS
1628 * @tc.author: zhuwentao
1629 */
1630 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData003, TestSize.Level3)
1631 {
1632 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1633 Key key1 = {'1'};
1634 Value value1 = {'1'};
1635 EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
1636
1637 std::vector<std::string> devices;
1638 devices.push_back(g_deviceB->GetDeviceId());
1639 std::map<std::string, DBStatus> result;
1640 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1641 ASSERT_TRUE(status == OK);
1642 ASSERT_TRUE(result.size() == devices.size());
1643 for (const auto &pair : result) {
1644 EXPECT_TRUE(pair.second == OK);
1645 }
1646 uint32_t keySize = 256u;
1647 uint32_t valuesize = 1024u;
1648 uint32_t itemCount = 2048u;
1649 CalculateDataTest(itemCount, keySize, valuesize);
1650 }
1651
1652 /**
1653 * @tc.name: CalculateSyncData004
1654 * @tc.desc: Test invalid device when call GetSyncDataSize interface
1655 * @tc.type: FUNC
1656 * @tc.require: AR000HI2JS
1657 * @tc.author: zhuwentao
1658 */
1659 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData004, TestSize.Level3)
1660 {
1661 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1662 std::string device;
1663 EXPECT_EQ(g_kvDelegatePtr->GetSyncDataSize(device), 0u);
1664 }
1665
1666 /**
1667 * @tc.name: CalculateSyncData005
1668 * @tc.desc: Test CalculateSyncData and rekey Concurrently
1669 * @tc.type: FUNC
1670 * @tc.require: AR000HI2JS
1671 * @tc.author: zhuwentao
1672 */
1673 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData005, TestSize.Level3)
1674 {
1675 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1676 size_t dataSize = 0;
1677 Key key1 = {'1'};
1678 Value value1 = {'1'};
1679 EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
__anone3be83740702() 1680 std::thread thread1([]() {
1681 std::this_thread::sleep_for(std::chrono::milliseconds(1));
1682 CipherPassword passwd; // random password
1683 vector<uint8_t> passwdBuffer(10, 45); // 10 and 45 as random password.
1684 passwd.SetValue(passwdBuffer.data(), passwdBuffer.size());
1685 g_kvDelegatePtr->Rekey(passwd);
1686 });
__anone3be83740802() 1687 std::thread thread2([&dataSize, &key1, &value1]() {
1688 dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
1689 if (dataSize > 0) {
1690 uint32_t expectedDataSize = (key1.size() + value1.size());
1691 uint32_t externalSize = 70u;
1692 uint32_t serialHeadLen = 8u;
1693 ASSERT_GE(static_cast<uint32_t>(dataSize), expectedDataSize);
1694 ASSERT_LE(static_cast<uint32_t>(dataSize), serialHeadLen + expectedDataSize + externalSize);
1695 }
1696 });
1697 thread1.join();
1698 thread2.join();
1699 }
1700
1701 /**
1702 * @tc.name: GetWaterMarkInfo001
1703 * @tc.desc: Test invalid dev for get water mark info.
1704 * @tc.type: FUNC
1705 * @tc.require:
1706 * @tc.author: zhangqiquan
1707 */
1708 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, GetWaterMarkInfo001, TestSize.Level0)
1709 {
1710 std::string dev;
1711 auto res = g_kvDelegatePtr->GetWatermarkInfo(dev);
1712 EXPECT_EQ(res.first, INVALID_ARGS);
1713 EXPECT_EQ(res.second.sendMark, 0u);
1714 EXPECT_EQ(res.second.receiveMark, 0u);
1715
1716 dev = std::string(DBConstant::MAX_DEV_LENGTH + 1, 'a');
1717 res = g_kvDelegatePtr->GetWatermarkInfo(dev);
1718 EXPECT_EQ(res.first, INVALID_ARGS);
1719 EXPECT_EQ(res.second.sendMark, 0u);
1720 EXPECT_EQ(res.second.receiveMark, 0u);
1721 }
1722
1723 /**
1724 * @tc.name: QuerySyncRetry001
1725 * @tc.desc: use sync retry sync use query pull by compress
1726 * @tc.type: FUNC
1727 * @tc.require:
1728 * @tc.author: zhangqiquan
1729 */
1730 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, QuerySyncRetry001, TestSize.Level3)
1731 {
1732 if (g_kvDelegatePtr != nullptr) {
1733 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1734 g_kvDelegatePtr = nullptr;
1735 }
1736 /**
1737 * @tc.steps: step1. open db use Compress
1738 * @tc.expected: step1, Pragma return OK.
1739 */
1740 KvStoreNbDelegate::Option option;
1741 option.isNeedCompressOnSync = true;
1742 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1743 ASSERT_TRUE(g_kvDelegateStatus == OK);
1744 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1745
1746 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, QUERY_SYNC_MESSAGE);
1747 std::vector<std::string> devices;
1748 devices.push_back(g_deviceB->GetDeviceId());
1749
1750 /**
1751 * @tc.steps: step2. set sync retry
1752 * @tc.expected: step2, Pragma return OK.
1753 */
1754 int pragmaData = 1;
1755 PragmaData input = static_cast<PragmaData>(&pragmaData);
1756 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1757
1758 /**
1759 * @tc.steps: step3. deviceA call sync and wait
1760 * @tc.expected: step3. sync should return OK.
1761 */
1762 std::map<std::string, DBStatus> result;
1763 auto query = Query::Select().PrefixKey({});
1764 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query) == OK);
1765
1766 /**
1767 * @tc.expected: step4. onComplete should be called, and status is time_out
1768 */
1769 ASSERT_TRUE(result.size() == devices.size());
1770 for (const auto &pair : result) {
1771 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1772 EXPECT_EQ(pair.second, OK);
1773 }
1774 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1775 }
1776 }