1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <condition_variable>
17 #include <gtest/gtest.h>
18 #include <thread>
19
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "platform_specific.h"
27 #include "runtime_config.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_kv_sync_task_context.h"
30
31 using namespace testing::ext;
32 using namespace DistributedDB;
33 using namespace DistributedDBUnitTest;
34 using namespace std;
35
36 namespace {
37 string g_testDir;
38 const string STORE_ID = "kv_stroe_sync_test";
39 const int64_t TIME_OFFSET = 5000000;
40 const int WAIT_TIME = 1000;
41 const std::string DEVICE_A = "real_device";
42 const std::string DEVICE_B = "deviceB";
43 const std::string DEVICE_C = "deviceC";
44
45 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
46 KvStoreConfig g_config;
47 DistributedDBToolsUnitTest g_tool;
48 DBStatus g_kvDelegateStatus = INVALID_ARGS;
49 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
50 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
51 KvVirtualDevice *g_deviceB = nullptr;
52 KvVirtualDevice *g_deviceC = nullptr;
53
54 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
55 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
56 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
57
CalculateDataTest(uint32_t itemCount,uint32_t keySize,uint32_t valueSize)58 void CalculateDataTest(uint32_t itemCount, uint32_t keySize, uint32_t valueSize)
59 {
60 for (uint32_t i = 0; i < itemCount; i++) {
61 std::vector<uint8_t> prefixKey = {'a', 'b', 'c'};
62 Key key = DistributedDBToolsUnitTest::GetRandPrefixKey(prefixKey, keySize);
63 Value value;
64 DistributedDBToolsUnitTest::GetRandomKeyValue(value, valueSize);
65 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
66 }
67 size_t dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
68 uint32_t expectedDataSize = (valueSize + keySize);
69 uint32_t externalSize = 70u;
70 uint32_t serialHeadLen = 8u;
71 LOGI("expectedDataSize=%u, v=%u", expectedDataSize, externalSize);
72 uint32_t maxDataSize = 1024u * 1024u;
73 if (itemCount * expectedDataSize >= maxDataSize) {
74 EXPECT_EQ(static_cast<uint32_t>(dataSize), maxDataSize);
75 return;
76 }
77 ASSERT_GE(static_cast<uint32_t>(dataSize), itemCount * expectedDataSize);
78 ASSERT_LE(static_cast<uint32_t>(dataSize), serialHeadLen + itemCount * (expectedDataSize + externalSize));
79 }
80
81 class DistributedDBSingleVerP2PSimpleSyncTest : public testing::Test {
82 public:
83 static void SetUpTestCase(void);
84 static void TearDownTestCase(void);
85 void SetUp();
86 void TearDown();
87 };
88
SetUpTestCase(void)89 void DistributedDBSingleVerP2PSimpleSyncTest::SetUpTestCase(void)
90 {
91 /**
92 * @tc.setup: Init datadir and Virtual Communicator.
93 */
94 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
95 g_config.dataDir = g_testDir;
96 g_mgr.SetKvStoreConfig(g_config);
97
98 string dir = g_testDir + "/single_ver";
99 DIR* dirTmp = opendir(dir.c_str());
100 if (dirTmp == nullptr) {
101 OS::MakeDBDirectory(dir);
102 } else {
103 closedir(dirTmp);
104 }
105
106 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
107 ASSERT_TRUE(g_communicatorAggregator != nullptr);
108 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
109 }
110
TearDownTestCase(void)111 void DistributedDBSingleVerP2PSimpleSyncTest::TearDownTestCase(void)
112 {
113 /**
114 * @tc.teardown: Release virtual Communicator and clear data dir.
115 */
116 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
117 LOGE("rm test db files error!");
118 }
119 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
120 }
121
SetUp(void)122 void DistributedDBSingleVerP2PSimpleSyncTest::SetUp(void)
123 {
124 DistributedDBToolsUnitTest::PrintTestCaseInfo();
125 /**
126 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
127 */
128 KvStoreNbDelegate::Option option;
129 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
130 ASSERT_TRUE(g_kvDelegateStatus == OK);
131 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
132 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
133 ASSERT_TRUE(g_deviceB != nullptr);
134 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
135 ASSERT_TRUE(syncInterfaceB != nullptr);
136 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
137
138 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
139 ASSERT_TRUE(g_deviceC != nullptr);
140 VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
141 ASSERT_TRUE(syncInterfaceC != nullptr);
142 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
143
144 auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
145 const std::string &deviceId, uint8_t flag) -> bool {
146 return true;
147 };
148 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
149 }
150
TearDown(void)151 void DistributedDBSingleVerP2PSimpleSyncTest::TearDown(void)
152 {
153 /**
154 * @tc.teardown: Release device A, B, C
155 */
156 if (g_kvDelegatePtr != nullptr) {
157 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
158 g_kvDelegatePtr = nullptr;
159 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
160 LOGD("delete kv store status %d", status);
161 ASSERT_TRUE(status == OK);
162 }
163 if (g_deviceB != nullptr) {
164 delete g_deviceB;
165 g_deviceB = nullptr;
166 }
167 if (g_deviceC != nullptr) {
168 delete g_deviceC;
169 g_deviceC = nullptr;
170 }
171 PermissionCheckCallbackV2 nullCallback;
172 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
173 }
174
CheckWatermark(const std::string & dev,KvStoreNbDelegate * kvDelegatePtr,WatermarkInfo expectInfo,bool sendEqual=true,bool receiveEqual=true)175 void CheckWatermark(const std::string &dev, KvStoreNbDelegate *kvDelegatePtr, WatermarkInfo expectInfo,
176 bool sendEqual = true, bool receiveEqual = true)
177 {
178 auto [status, watermarkInfo] = kvDelegatePtr->GetWatermarkInfo(dev);
179 EXPECT_EQ(status, OK);
180 if (sendEqual) {
181 EXPECT_EQ(watermarkInfo.sendMark, expectInfo.sendMark);
182 } else {
183 EXPECT_NE(watermarkInfo.sendMark, expectInfo.sendMark);
184 }
185 if (receiveEqual) {
186 EXPECT_EQ(watermarkInfo.receiveMark, expectInfo.receiveMark);
187 } else {
188 EXPECT_NE(watermarkInfo.receiveMark, expectInfo.receiveMark);
189 }
190 }
191
192 /**
193 * @tc.name: Normal Sync 001
194 * @tc.desc: Test normal push sync for add data.
195 * @tc.type: FUNC
196 * @tc.require:
197 * @tc.author: xushaohua
198 */
199 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync001, TestSize.Level1)
200 {
201 DBStatus status = OK;
202 std::vector<std::string> devices;
203 devices.push_back(g_deviceB->GetDeviceId());
204 devices.push_back(g_deviceC->GetDeviceId());
205
206 /**
207 * @tc.steps: step1. deviceA put {k1, v1}
208 */
209 Key key = {'1'};
210 Value value = {'1'};
211
212 WatermarkInfo info;
213 CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info);
214 CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info);
215 status = g_kvDelegatePtr->Put(key, value);
216 ASSERT_TRUE(status == OK);
217
218 /**
219 * @tc.steps: step2. deviceA call sync and wait
220 * @tc.expected: step2. sync should return OK.
221 */
222 std::map<std::string, DBStatus> result;
223 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
224 ASSERT_TRUE(status == OK);
225 CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info, false);
226 CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info, false);
227
228 /**
229 * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
230 */
231 ASSERT_TRUE(result.size() == devices.size());
232 for (const auto &pair : result) {
233 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
234 EXPECT_TRUE(pair.second == OK);
235 }
236 VirtualDataItem item;
237 g_deviceB->GetData(key, item);
238 EXPECT_TRUE(item.value == value);
239 g_deviceC->GetData(key, item);
240 EXPECT_TRUE(item.value == value);
241 }
242
243 /**
244 * @tc.name: Normal Sync 002
245 * @tc.desc: Test normal push sync for update data.
246 * @tc.type: FUNC
247 * @tc.require:
248 * @tc.author: xushaohua
249 */
250 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync002, TestSize.Level1)
251 {
252 DBStatus status = OK;
253 std::vector<std::string> devices;
254 devices.push_back(g_deviceB->GetDeviceId());
255 devices.push_back(g_deviceC->GetDeviceId());
256
257 /**
258 * @tc.steps: step1. deviceA put {k1, v1}
259 */
260 Key key = {'1'};
261 Value value = {'1'};
262 status = g_kvDelegatePtr->Put(key, value);
263 ASSERT_TRUE(status == OK);
264
265 /**
266 * @tc.steps: step2. deviceA put {k1, v2}
267 */
268 Value value2;
269 value2.push_back('2');
270 status = g_kvDelegatePtr->Put(key, value2);
271 ASSERT_TRUE(status == OK);
272
273 /**
274 * @tc.steps: step3. deviceA call sync and wait
275 * @tc.expected: step3. sync should return OK.
276 */
277 std::map<std::string, DBStatus> result;
278 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
279 ASSERT_TRUE(status == OK);
280
281 /**
282 * @tc.expected: step3. onComplete should be called, DeviceB,C have {k1,v2}
283 */
284 ASSERT_TRUE(result.size() == devices.size());
285 for (const auto &pair : result) {
286 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
287 EXPECT_TRUE(pair.second == OK);
288 }
289 VirtualDataItem item;
290 g_deviceC->GetData(key, item);
291 EXPECT_TRUE(item.value == value2);
292 g_deviceB->GetData(key, item);
293 EXPECT_TRUE(item.value == value2);
294 }
295
296 /**
297 * @tc.name: Normal Sync 003
298 * @tc.desc: Test normal push sync for delete data.
299 * @tc.type: FUNC
300 * @tc.require:
301 * @tc.author: xushaohua
302 */
303 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync003, TestSize.Level1)
304 {
305 DBStatus status = OK;
306 std::vector<std::string> devices;
307 devices.push_back(g_deviceB->GetDeviceId());
308 devices.push_back(g_deviceC->GetDeviceId());
309
310 /**
311 * @tc.steps: step1. deviceA put {k1, v1}
312 */
313 Key key = {'1'};
314 Value value = {'1'};
315 status = g_kvDelegatePtr->Put(key, value);
316 ASSERT_TRUE(status == OK);
317
318 /**
319 * @tc.steps: step2. deviceA delete k1
320 */
321 status = g_kvDelegatePtr->Delete(key);
322 ASSERT_TRUE(status == OK);
323 std::map<std::string, DBStatus> result;
324 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
325 ASSERT_TRUE(status == OK);
326
327 /**
328 * @tc.steps: step3. deviceA call sync and wait
329 * @tc.expected: step3. sync should return OK.
330 */
331 ASSERT_TRUE(result.size() == devices.size());
332 for (const auto &pair : result) {
333 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
334 EXPECT_TRUE(pair.second == OK);
335 }
336
337 /**
338 * @tc.expected: step3. onComplete should be called, DeviceB,C have {k1, delete}
339 */
340 VirtualDataItem item;
341 Key hashKey;
342 DistributedDBToolsUnitTest::CalcHash(key, hashKey);
343 EXPECT_EQ(g_deviceB->GetData(hashKey, item), -E_NOT_FOUND);
344 EXPECT_EQ(g_deviceC->GetData(hashKey, item), -E_NOT_FOUND);
345 }
346
347 /**
348 * @tc.name: Normal Sync 004
349 * @tc.desc: Test normal pull sync for add data.
350 * @tc.type: FUNC
351 * @tc.require:
352 * @tc.author: xushaohua
353 */
354 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync004, TestSize.Level1)
355 {
356 DBStatus status = OK;
357 std::vector<std::string> devices;
358 devices.push_back(g_deviceB->GetDeviceId());
359 devices.push_back(g_deviceC->GetDeviceId());
360
361 WatermarkInfo info;
362 CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info);
363 CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info);
364 /**
365 * @tc.steps: step1. deviceB put {k1, v1}
366 */
367 Key key = {'1'};
368 Value value = {'1'};
369 g_deviceB->PutData(key, value, 0, 0);
370
371 /**
372 * @tc.steps: step2. deviceB put {k2, v2}
373 */
374 Key key2 = {'2'};
375 Value value2 = {'2'};
376 g_deviceC->PutData(key2, value2, 0, 0);
377 ASSERT_TRUE(status == OK);
378
379 /**
380 * @tc.steps: step3. deviceA call pull sync
381 * @tc.expected: step3. sync should return OK.
382 */
383 std::map<std::string, DBStatus> result;
384 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
385 ASSERT_TRUE(status == OK);
386 CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info, true, false);
387 CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info, true, false);
388
389 /**
390 * @tc.expected: step3. onComplete should be called, DeviceA have {k1, VALUE_1}, {K2. VALUE_2}
391 */
392 ASSERT_TRUE(result.size() == devices.size());
393 for (const auto &pair : result) {
394 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
395 EXPECT_TRUE(pair.second == OK);
396 }
397 Value value3;
398 EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
399 EXPECT_EQ(value3, value);
400 EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
401 EXPECT_EQ(value3, value2);
402 }
403
404 /**
405 * @tc.name: Normal Sync 005
406 * @tc.desc: Test normal pull sync for update data.
407 * @tc.type: FUNC
408 * @tc.require:
409 * @tc.author: xushaohua
410 */
411 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync005, TestSize.Level2)
412 {
413 DBStatus status = OK;
414 std::vector<std::string> devices;
415 devices.push_back(g_deviceB->GetDeviceId());
416 devices.push_back(g_deviceC->GetDeviceId());
417
418 /**
419 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
420 */
421 Key key1 = {'1'};
422 Value value1 = {'1'};
423 status = g_kvDelegatePtr->Put(key1, value1);
424 ASSERT_TRUE(status == OK);
425 Key key2 = {'2'};
426 Value value2 = {'2'};
427 status = g_kvDelegatePtr->Put(key2, value2);
428 ASSERT_TRUE(status == OK);
429
430 /**
431 * @tc.steps: step2. deviceB put {k1, v3} t2, t2 > t1
432 */
433 Value value3;
434 value3.push_back('3');
435 g_deviceB->PutData(key1, value3,
436 TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 0);
437
438 /**
439 * @tc.steps: step3. deviceC put {k2, v4} t2, t4 < t1
440 */
441 Value value4;
442 value4.push_back('4');
443 g_deviceC->PutData(key2, value4,
444 TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
445
446 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
447 /**
448 * @tc.steps: step4. deviceA call pull sync
449 * @tc.expected: step4. sync should return OK.
450 */
451 std::map<std::string, DBStatus> result;
452 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
453 ASSERT_TRUE(status == OK);
454
455 /**
456 * @tc.expected: step4. onComplete should be called, DeviceA have {k1, v3}, {k2. v2}
457 */
458 ASSERT_TRUE(result.size() == devices.size());
459 for (const auto &pair : result) {
460 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
461 EXPECT_TRUE(pair.second == OK);
462 }
463
464 Value value5;
465 g_kvDelegatePtr->Get(key1, value5);
466 EXPECT_TRUE(value5 == value3);
467 g_kvDelegatePtr->Get(key2, value5);
468 EXPECT_TRUE(value5 == value2);
469 }
470
471 /**
472 * @tc.name: Normal Sync 006
473 * @tc.desc: Test normal pull sync for delete data.
474 * @tc.type: FUNC
475 * @tc.require:
476 * @tc.author: xushaohua
477 */
478 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync006, TestSize.Level2)
479 {
480 /**
481 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
482 */
483 Key key1 = {'1'};
484 Value value1 = {'1'};
485 DBStatus status = g_kvDelegatePtr->Put(key1, value1);
486 ASSERT_TRUE(status == OK);
487 Key key2 = {'2'};
488 Value value2 = {'2'};
489 status = g_kvDelegatePtr->Put(key2, value2);
490 ASSERT_TRUE(status == OK);
491
492 /**
493 * @tc.steps: step2. deviceA put {k1, delete} t2, t2 <t1
494 */
495 Key hashKey1;
496 DistributedDBToolsUnitTest::CalcHash(key1, hashKey1);
497 g_deviceB->PutData(hashKey1, value1,
498 TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 1);
499
500 /**
501 * @tc.steps: step3. deviceA put {k1, delete} t3, t3 < t1
502 */
503 Key hashKey2;
504 DistributedDBToolsUnitTest::CalcHash(key2, hashKey2);
505 g_deviceC->PutData(hashKey2, value1,
506 TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
507
508 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
509 /**
510 * @tc.steps: step4. deviceA call pull sync
511 * @tc.expected: step4. sync should return OK.
512 */
513 std::map<std::string, DBStatus> result;
514 std::vector<std::string> devices;
515 devices.push_back(g_deviceB->GetDeviceId());
516 devices.push_back(g_deviceC->GetDeviceId());
517 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
518 ASSERT_TRUE(status == OK);
519
520 /**
521 * @tc.expected: step4. onComplete should be called, DeviceA have {k2. v2} don't have k1
522 */
523 ASSERT_TRUE(result.size() == devices.size());
524 for (const auto &pair : result) {
525 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
526 EXPECT_TRUE(pair.second == OK);
527 }
528 Value value5;
529 g_kvDelegatePtr->Get(key1, value5);
530 EXPECT_TRUE(value5.empty());
531 g_kvDelegatePtr->Get(key2, value5);
532 EXPECT_TRUE(value5 == value2);
533 }
534
535 /**
536 * @tc.name: Normal Sync 007
537 * @tc.desc: Test normal push_pull sync for add data.
538 * @tc.type: FUNC
539 * @tc.require:
540 * @tc.author: xushaohua
541 */
542 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync007, TestSize.Level1)
543 {
544 DBStatus status = OK;
545 std::vector<std::string> devices;
546 devices.push_back(g_deviceB->GetDeviceId());
547 devices.push_back(g_deviceC->GetDeviceId());
548
549 /**
550 * @tc.steps: step1. deviceA put {k1, v1}
551 */
552 Key key1 = {'1'};
553 Value value1 = {'1'};
554 status = g_kvDelegatePtr->Put(key1, value1);
555 EXPECT_TRUE(status == OK);
556
557 /**
558 * @tc.steps: step1. deviceB put {k2, v2}
559 */
560 Key key2 = {'2'};
561 Value value2 = {'2'};
562 g_deviceB->PutData(key2, value2, 0, 0);
563
564 /**
565 * @tc.steps: step1. deviceB put {k3, v3}
566 */
567 Key key3 = {'3'};
568 Value value3 = {'3'};
569 g_deviceC->PutData(key3, value3, 0, 0);
570
571 /**
572 * @tc.steps: step4. deviceA call push_pull sync
573 * @tc.expected: step4. sync should return OK.
574 */
575 std::map<std::string, DBStatus> result;
576 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
577 ASSERT_TRUE(status == OK);
578
579 ASSERT_TRUE(result.size() == devices.size());
580 for (const auto &pair : result) {
581 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
582 EXPECT_TRUE(pair.second == OK);
583 }
584
585 /**
586 * @tc.expected: step4. onComplete should be called, DeviceA have {k1. v1}, {k2, v2}, {k3, v3}
587 * deviceB received {k1. v1}, don't received k3, deviceC received {k1. v1}, don't received k2
588 */
589 Value value4;
590 g_kvDelegatePtr->Get(key2, value4);
591 EXPECT_TRUE(value4 == value2);
592 g_kvDelegatePtr->Get(key3, value4);
593 EXPECT_TRUE(value4 == value3);
594
595 VirtualDataItem item1;
596 g_deviceB->GetData(key1, item1);
597 EXPECT_TRUE(item1.value == value1);
598 item1.value.clear();
599 g_deviceB->GetData(key3, item1);
600 EXPECT_TRUE(item1.value.empty());
601
602 VirtualDataItem item2;
603 g_deviceC->GetData(key1, item2);
604 EXPECT_TRUE(item2.value == value1);
605 item2.value.clear();
606 g_deviceC->GetData(key2, item2);
607 EXPECT_TRUE(item2.value.empty());
608 }
609
610 /**
611 * @tc.name: Normal Sync 008
612 * @tc.desc: Test normal push_pull sync for update data.
613 * @tc.type: FUNC
614 * @tc.require:
615 * @tc.author: xushaohua
616 */
617 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync008, TestSize.Level2)
618 {
619 DBStatus status = OK;
620 std::vector<std::string> devices;
621 devices.push_back(g_deviceB->GetDeviceId());
622 devices.push_back(g_deviceC->GetDeviceId());
623
624 /**
625 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
626 */
627 Key key1 = {'1'};
628 Value value1 = {'1'};
629 status = g_kvDelegatePtr->Put(key1, value1);
630 ASSERT_TRUE(status == OK);
631
632 Key key2 = {'2'};
633 Value value2 = {'2'};
634 status = g_kvDelegatePtr->Put(key2, value2);
635 ASSERT_TRUE(status == OK);
636
637 /**
638 * @tc.steps: step2. deviceB put {k1, v3} t2, t2 > t1
639 */
640 Value value3 = {'3'};
641 g_deviceB->PutData(key1, value3,
642 TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 0);
643
644 /**
645 * @tc.steps: step3. deviceB put {k1, v4} t3, t4 <t1
646 */
647 Value value4 = {'4'};
648 g_deviceC->PutData(key2, value4,
649 TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
650 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
651
652 /**
653 * @tc.steps: step4. deviceA call push_pull sync
654 * @tc.expected: step4. sync should return OK.
655 */
656 std::map<std::string, DBStatus> result;
657 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
658 ASSERT_TRUE(status == OK);
659 ASSERT_TRUE(result.size() == devices.size());
660 for (const auto &pair : result) {
661 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
662 EXPECT_TRUE(pair.second == OK);
663 }
664
665 /**
666 * @tc.expected: step4. onComplete should be called, DeviceA have {k1. v3}, {k2, v2}
667 * deviceB have {k1. v3}, deviceC have {k2. v2}
668 */
669 Value value5;
670 g_kvDelegatePtr->Get(key1, value5);
671 EXPECT_EQ(value5, value3);
672 g_kvDelegatePtr->Get(key2, value5);
673 EXPECT_EQ(value5, value2);
674
675 VirtualDataItem item1;
676 g_deviceB->GetData(key1, item1);
677 EXPECT_TRUE(item1.value == value3);
678 item1.value.clear();
679 g_deviceB->GetData(key2, item1);
680 EXPECT_TRUE(item1.value == value2);
681
682 VirtualDataItem item2;
683 g_deviceC->GetData(key2, item2);
684 EXPECT_TRUE(item2.value == value2);
685 }
686
687 /**
688 * @tc.name: Normal Sync 009
689 * @tc.desc: Test normal push_pull sync for delete data.
690 * @tc.type: FUNC
691 * @tc.require:
692 * @tc.author: xushaohua
693 */
694 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync009, TestSize.Level2)
695 {
696 DBStatus status = OK;
697 std::vector<std::string> devices;
698 devices.push_back(g_deviceB->GetDeviceId());
699 devices.push_back(g_deviceC->GetDeviceId());
700
701 /**
702 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
703 */
704 Key key1 = {'1'};
705 Value value1 = {'1'};
706 status = g_kvDelegatePtr->Put(key1, value1);
707 ASSERT_TRUE(status == OK);
708
709 Key key2 = {'2'};
710 Value value2 = {'2'};
711 status = g_kvDelegatePtr->Put(key2, value2);
712 ASSERT_TRUE(status == OK);
713
714 /**
715 * @tc.steps: step2. deviceB put {k1, delete} t2, t2 > t1
716 */
717 Key hashKey1;
718 DistributedDBToolsUnitTest::CalcHash(key1, hashKey1);
719 g_deviceB->PutData(hashKey1, value1,
720 TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 1);
721
722 /**
723 * @tc.steps: step3. deviceB put {k1, delete} t3, t2 < t1
724 */
725 Key hashKey2;
726 DistributedDBToolsUnitTest::CalcHash(key2, hashKey2);
727 g_deviceC->PutData(hashKey2, value2,
728 TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 1);
729
730 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
731 /**
732 * @tc.steps: step4. deviceA call push_pull sync
733 * @tc.expected: step4. sync should return OK.
734 */
735 std::map<std::string, DBStatus> result;
736 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
737 ASSERT_TRUE(status == OK);
738
739 /**
740 * @tc.expected: step4. onComplete should be called, DeviceA have {k1. delete}, {k2, v2}
741 * deviceB have {k2. v2}, deviceC have {k2. v2}
742 */
743 ASSERT_TRUE(result.size() == devices.size());
744 for (const auto &pair : result) {
745 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
746 EXPECT_TRUE(pair.second == OK);
747 }
748
749 Value value3;
750 g_kvDelegatePtr->Get(key1, value3);
751 EXPECT_TRUE(value3.empty());
752 value3.clear();
753 g_kvDelegatePtr->Get(key2, value3);
754 EXPECT_TRUE(value3 == value2);
755
756 VirtualDataItem item1;
757 g_deviceB->GetData(key2, item1);
758 EXPECT_TRUE(item1.value == value2);
759
760 VirtualDataItem item2;
761 g_deviceC->GetData(key2, item2);
762 EXPECT_TRUE(item2.value == value2);
763 }
764
765 /**
766 * @tc.name: Normal Sync 010
767 * @tc.desc: Test sync failed by invalid devices.
768 * @tc.type: FUNC
769 * @tc.require:
770 * @tc.author: zhangqiquan
771 */
772 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync010, TestSize.Level1)
773 {
774 DBStatus status = OK;
775 std::vector<std::string> devices;
776 std::string invalidDev = std::string(DBConstant::MAX_DEV_LENGTH + 1, '0');
777 devices.push_back(DEVICE_A);
778 devices.push_back(g_deviceB->GetDeviceId());
779 devices.push_back(invalidDev);
780
781 std::map<std::string, DBStatus> result;
782 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
783 ASSERT_TRUE(status == OK);
784
785 ASSERT_EQ(result.size(), devices.size());
786 EXPECT_EQ(result[DEVICE_A], INVALID_ARGS);
787 EXPECT_EQ(result[invalidDev], INVALID_ARGS);
788 EXPECT_EQ(result[DEVICE_B], OK);
789 }
790
791 /**
792 * @tc.name: Normal Sync 011
793 * @tc.desc: Test sync with translated id.
794 * @tc.type: FUNC
795 * @tc.require:
796 * @tc.author: zhangqiquan
797 */
798 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync011, TestSize.Level0)
799 {
800 DBStatus status = OK;
801 std::vector<std::string> devices;
802 devices.push_back(g_deviceB->GetDeviceId());
803
804 /**
805 * @tc.steps: step1. deviceA put {k1, v1}
806 */
807 Key key = {'1'};
808 Value value = {'1'};
809 status = g_kvDelegatePtr->Put(key, value);
810 ASSERT_TRUE(status == OK);
811
812 /**
813 * @tc.steps: step2. ori dev will be append test
814 * @tc.expected: step2. sync should return OK.
815 */
__anon2fced0530302(const std::string &oriDevId, const StoreInfo &) 816 RuntimeConfig::SetTranslateToDeviceIdCallback([](const std::string &oriDevId, const StoreInfo &) {
817 std::string dev = oriDevId + "test";
818 LOGI("translate %s to %s", oriDevId.c_str(), dev.c_str());
819 return dev;
820 });
821 std::map<std::string, DBStatus> result;
822 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
823 ASSERT_TRUE(status == OK);
824 RuntimeConfig::SetTranslateToDeviceIdCallback(nullptr);
825
826 /**
827 * @tc.expected: step2. onComplete should be called, Send watermark should not be zero
828 */
829 ASSERT_TRUE(result.size() == devices.size());
830 for (const auto &pair : result) {
831 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
832 EXPECT_TRUE(pair.second == OK);
833 }
834 WatermarkInfo info;
835 CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info, false);
836 info = {};
837 std::string checkDev = g_deviceB->GetDeviceId() + "test";
838 CheckWatermark(checkDev, g_kvDelegatePtr, info, false);
839 }
840
841 /**
842 * @tc.name: Normal Sync 012
843 * @tc.desc: Test sync with max data.
844 * @tc.type: FUNC
845 * @tc.require:
846 * @tc.author: wangxiangdong
847 */
848 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync012, TestSize.Level0)
849 {
850 /**
851 * @tc.steps: step1. prepare env
852 */
853 DBStatus status = OK;
854 std::vector<std::string> devices;
855 devices.push_back(g_deviceB->GetDeviceId());
856 uint32_t maxValueSize = 64 * 1024 * 1024;
857 Value maxValue;
858 DistributedDBToolsUnitTest::GetRandomKeyValue(maxValue, maxValueSize); // 64M
859 PragmaData input = static_cast<PragmaData>(&maxValueSize);
860 status = g_kvDelegatePtr->Pragma(SET_MAX_VALUE_SIZE, input);
861 /**
862 * @tc.steps: step2. deviceA put {k1, v1}
863 */
864 Key key = {'1'};
865 status = g_kvDelegatePtr->Put(key, maxValue);
866 Key key2 = {'2'};
867 Value value2 = {'2'};
868 status = g_kvDelegatePtr->Put(key2, value2);
869 ASSERT_TRUE(status == OK);
870
871 /**
872 * @tc.steps: step3. ori dev will be append test
873 * @tc.expected: step3. sync should return OK.
874 */
__anon2fced0530402(const std::string &oriDevId, const StoreInfo &) 875 RuntimeConfig::SetTranslateToDeviceIdCallback([](const std::string &oriDevId, const StoreInfo &) {
876 std::string dev = oriDevId + "test";
877 LOGI("translate %s to %s", oriDevId.c_str(), dev.c_str());
878 return dev;
879 });
880 std::map<std::string, DBStatus> result;
881 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
882 ASSERT_TRUE(status == OK);
883 RuntimeConfig::SetTranslateToDeviceIdCallback(nullptr);
884
885 /**
886 * @tc.expected: step4. onComplete should be called, deviceB should get same data
887 */
888 ASSERT_TRUE(result.size() == devices.size());
889 for (const auto &pair : result) {
890 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
891 EXPECT_TRUE(pair.second == OK);
892 }
893 VirtualDataItem item;
894 g_deviceB->GetData(key, item);
895 EXPECT_TRUE(item.value == maxValue);
896 VirtualDataItem item2;
897 g_deviceB->GetData(key2, item2);
898 EXPECT_TRUE(item2.value == value2);
899 }
900
901 /**
902 * @tc.name: Limit Data Sync 001
903 * @tc.desc: Test sync limit key and value data
904 * @tc.type: FUNC
905 * @tc.require:
906 * @tc.author: xushaohua
907 */
908 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, LimitDataSync001, TestSize.Level1)
909 {
910 DBStatus status = OK;
911 std::vector<std::string> devices;
912 devices.push_back(g_deviceB->GetDeviceId());
913
914 Key key1;
915 Value value1;
916 DistributedDBToolsUnitTest::GetRandomKeyValue(key1, DBConstant::MAX_KEY_SIZE + 1);
917 DistributedDBToolsUnitTest::GetRandomKeyValue(value1, DBConstant::MAX_VALUE_SIZE + 1);
918
919 Key key2;
920 Value value2;
921 DistributedDBToolsUnitTest::GetRandomKeyValue(key2, DBConstant::MAX_KEY_SIZE);
922 DistributedDBToolsUnitTest::GetRandomKeyValue(value2, DBConstant::MAX_VALUE_SIZE);
923
924 /**
925 * @tc.steps: step1. deviceB put {k1, v1}, K1 > 1k, v1 > 4M
926 */
927 g_deviceB->PutData(key1, value1, 0, 0);
928
929 /**
930 * @tc.steps: step2. deviceB put {k2, v2}, K2 = 1k, v2 = 4M
931 */
932 g_deviceC->PutData(key2, value2, 0, 0);
933
934 /**
935 * @tc.steps: step3. deviceA call pull sync from device B
936 * @tc.expected: step3. sync should return OK.
937 */
938 std::map<std::string, DBStatus> result;
939 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
940 ASSERT_TRUE(status == OK);
941
942 /**
943 * @tc.expected: step3. onComplete should be called.
944 */
945 ASSERT_TRUE(result.size() == devices.size());
946 for (const auto &pair : result) {
947 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
948 if (pair.first == g_deviceB->GetDeviceId()) {
949 EXPECT_TRUE(pair.second != OK);
950 } else {
951 EXPECT_TRUE(pair.second == OK);
952 }
953 }
954
955 /**
956 * @tc.steps: step4. deviceA call pull sync from deviceC
957 * @tc.expected: step4. sync should return OK.
958 */
959 devices.clear();
960 result.clear();
961 devices.push_back(g_deviceC->GetDeviceId());
962 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
963 ASSERT_TRUE(status == OK);
964
965 /**
966 * @tc.expected: step4. onComplete should be called, DeviceA have {k2. v2}, don't have {k1, v1}
967 */
968 ASSERT_TRUE(result.size() == devices.size());
969 for (const auto &pair : result) {
970 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
971 EXPECT_TRUE(pair.second == OK);
972 }
973
974 // Get value from A
975 Value valueRead;
976 EXPECT_TRUE(g_kvDelegatePtr->Get(key1, valueRead) != OK);
977 valueRead.clear();
978 EXPECT_EQ(g_kvDelegatePtr->Get(key2, valueRead), OK);
979 EXPECT_TRUE(valueRead == value2);
980 }
981
982 /**
983 * @tc.name: Limit Data Sync 002
984 * @tc.desc: Test PutBatch with invalid entries and then call sync.
985 * @tc.type: FUNC
986 * @tc.require:
987 * @tc.author: mazhao
988 */
989 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, LimitDataSync002, TestSize.Level1)
990 {
991 DBStatus status = OK;
992 std::vector<std::string> devices;
993 devices.push_back(g_deviceB->GetDeviceId());
994 devices.push_back(g_deviceC->GetDeviceId());
995 Key legalKey;
996 DistributedDBToolsUnitTest::GetRandomKeyValue(legalKey, DBConstant::MAX_KEY_SIZE); // 1K
997 Value legalValue;
998 DistributedDBToolsUnitTest::GetRandomKeyValue(legalValue, DBConstant::MAX_VALUE_SIZE); // 4M
999 Value emptyValue; // 0k
1000 vector<Entry> illegalEntrys; // size is 512M + 1KB
1001 for (int i = 0; i < 127; i++) { // 127 * (legalValue + legalKey) is equal to 508M + 127KB < 512M.
1002 illegalEntrys.push_back({legalKey, legalValue});
1003 }
1004 for (int i = 0; i < 3970; i++) { // 3970 * legalKey is equal to 3970KB.
1005 illegalEntrys.push_back({legalKey, emptyValue});
1006 }
1007 /**
1008 * @tc.steps: step1. PutBatch with invalid entries inside which total length of the key and valud is more than 512M
1009 * @tc.expected: step1. PutBatch should return INVALID_ARGS.
1010 */
1011 EXPECT_EQ(g_kvDelegatePtr->PutBatch(illegalEntrys), INVALID_ARGS);
1012 /**
1013 * @tc.steps: step2. deviceA call push_pull sync
1014 * @tc.expected: step2. sync return OK and all statuses is OK.
1015 */
1016 std::map<std::string, DBStatus> result;
1017 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
1018 ASSERT_TRUE(status == OK);
1019 ASSERT_TRUE(result.size() == devices.size());
1020 for (const auto &pair : result) {
1021 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1022 printf("dev %s, status %d", pair.first.c_str(), pair.second);
1023 EXPECT_TRUE(pair.second == OK);
1024 }
1025 }
1026
1027 /**
1028 * @tc.name: Auto Sync 001
1029 * @tc.desc: Verify auto sync enable function.
1030 * @tc.type: FUNC
1031 * @tc.require:
1032 * @tc.author: xushaohua
1033 */
1034 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, AutoSync001, TestSize.Level1)
1035 {
1036 std::vector<std::string> devices;
1037 devices.push_back(g_deviceB->GetDeviceId());
1038 devices.push_back(g_deviceC->GetDeviceId());
1039
1040 /**
1041 * @tc.steps: step1. enable auto sync
1042 * @tc.expected: step1, Pragma return OK.
1043 */
1044 bool autoSync = true;
1045 PragmaData data = static_cast<PragmaData>(&autoSync);
1046 DBStatus status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1047 ASSERT_EQ(status, OK);
1048
1049 /**
1050 * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1051 */
1052 ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1053 ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_2, VALUE_2) == OK);
1054
1055 /**
1056 * @tc.steps: step3. sleep for data sync
1057 * @tc.expected: step3. deviceB,C has {k1, v1}, {k2, v2}
1058 */
1059 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1060 VirtualDataItem item;
1061 g_deviceB->GetData(KEY_1, item);
1062 EXPECT_EQ(item.value, VALUE_1);
1063 g_deviceB->GetData(KEY_2, item);
1064 EXPECT_EQ(item.value, VALUE_2);
1065 g_deviceC->GetData(KEY_1, item);
1066 EXPECT_EQ(item.value, VALUE_1);
1067 g_deviceC->GetData(KEY_2, item);
1068 EXPECT_EQ(item.value, VALUE_2);
1069 }
1070
1071 /**
1072 * @tc.name: Auto Sync 002
1073 * @tc.desc: Verify auto sync disable function.
1074 * @tc.type: FUNC
1075 * @tc.require:
1076 * @tc.author: xushaohua
1077 */
1078 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, AutoSync002, TestSize.Level1)
1079 {
1080 std::vector<std::string> devices;
1081 devices.push_back(g_deviceB->GetDeviceId());
1082 devices.push_back(g_deviceC->GetDeviceId());
1083
1084 /**
1085 * @tc.steps: step1. disable auto sync
1086 * @tc.expected: step1, Pragma return OK.
1087 */
1088 bool autoSync = false;
1089 PragmaData data = static_cast<PragmaData>(&autoSync);
1090 DBStatus status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1091 ASSERT_EQ(status, OK);
1092
1093 /**
1094 * @tc.steps: step2. deviceB put {k1, v1}, deviceC put {k2, v2}
1095 */
1096 g_deviceB->PutData(KEY_1, VALUE_1, 0, 0);
1097 g_deviceC->PutData(KEY_2, VALUE_2, 0, 0);
1098
1099 /**
1100 * @tc.steps: step3. sleep for data sync
1101 * @tc.expected: step3. deviceA don't have k1, k2.
1102 */
1103 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1104 Value value3;
1105 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == NOT_FOUND);
1106 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == NOT_FOUND);
1107 }
1108
1109 /**
1110 * @tc.name: Block Sync 001
1111 * @tc.desc: Verify block push sync function.
1112 * @tc.type: FUNC
1113 * @tc.require:
1114 * @tc.author: xushaohua
1115 */
1116 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync001, TestSize.Level1)
1117 {
1118 std::vector<std::string> devices;
1119 devices.push_back(g_deviceB->GetDeviceId());
1120 devices.push_back(g_deviceC->GetDeviceId());
1121
1122 /**
1123 * @tc.steps: step1. deviceA put {k1, v1}
1124 */
1125 g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1126
1127 /**
1128 * @tc.steps: step2. deviceA call block push sync to deviceB & deviceC.
1129 * @tc.expected: step2. Sync return OK, devices status OK, deviceB & deivceC has {k1, v1}.
1130 */
1131 std::map<std::string, DBStatus> result;
1132 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1133 ASSERT_EQ(status, OK);
1134 ASSERT_TRUE(result.size() == devices.size());
1135 for (const auto &pair : result) {
1136 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1137 EXPECT_TRUE(pair.second == OK);
1138 }
1139 VirtualDataItem item1;
1140 EXPECT_EQ(g_deviceB->GetData(KEY_1, item1), E_OK);
1141 EXPECT_EQ(item1.value, VALUE_1);
1142 VirtualDataItem item2;
1143 EXPECT_EQ(g_deviceC->GetData(KEY_1, item2), E_OK);
1144 EXPECT_EQ(item2.value, VALUE_1);
1145 }
1146
1147 /**
1148 * @tc.name: Block Sync 002
1149 * @tc.desc: Verify block pull sync function.
1150 * @tc.type: FUNC
1151 * @tc.require:
1152 * @tc.author: xushaohua
1153 */
1154 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync002, TestSize.Level1)
1155 {
1156 std::vector<std::string> devices;
1157 devices.push_back(g_deviceB->GetDeviceId());
1158 devices.push_back(g_deviceC->GetDeviceId());
1159
1160 /**
1161 * @tc.steps: step1. deviceB put {k1, v1}, deviceC put {k2, v2}
1162 */
1163 g_deviceB->PutData(KEY_1, VALUE_1, 0, 0);
1164 g_deviceC->PutData(KEY_2, VALUE_2, 0, 0);
1165
1166 /**
1167 * @tc.steps: step2. deviceA call block pull and pull sync to deviceB & deviceC.
1168 * @tc.expected: step2. Sync return OK, devices status OK, deviceA has {k1, v1}, {k2, v2}
1169 */
1170 std::map<std::string, DBStatus> result;
1171 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1172 ASSERT_EQ(status, OK);
1173 ASSERT_TRUE(result.size() == devices.size());
1174 for (const auto &pair : result) {
1175 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1176 EXPECT_TRUE(pair.second == OK);
1177 }
1178 Value value3;
1179 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == OK);
1180 EXPECT_TRUE(value3 == VALUE_1);
1181 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == OK);
1182 EXPECT_TRUE(value3 == VALUE_2);
1183 }
1184
1185 /**
1186 * @tc.name: Block Sync 003
1187 * @tc.desc: Verify block push and pull sync function.
1188 * @tc.type: FUNC
1189 * @tc.require:
1190 * @tc.author: xushaohua
1191 */
1192 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync003, TestSize.Level1)
1193 {
1194 std::vector<std::string> devices;
1195 devices.push_back(g_deviceB->GetDeviceId());
1196 devices.push_back(g_deviceC->GetDeviceId());
1197
1198 /**
1199 * @tc.steps: step1. deviceA put {k1, v1}
1200 */
1201 g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1202
1203 /**
1204 * @tc.steps: step2. deviceB put {k1, v1}, deviceB put {k2, v2}
1205 */
1206 g_deviceB->PutData(KEY_2, VALUE_2, 0, 0);
1207 g_deviceC->PutData(KEY_3, VALUE_3, 0, 0);
1208
1209 /**
1210 * @tc.steps: step3. deviceA call block pull and pull sync to deviceB & deviceC.
1211 * @tc.expected: step3. Sync return OK, devices status OK, deviceA has {k1, v1}, {k2, v2} {k3, v3}
1212 * deviceB has {k1, v1}, {k2. v2} , deviceC has {k1, v1}, {k3, v3}
1213 */
1214 std::map<std::string, DBStatus> result;
1215 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, true);
1216 ASSERT_EQ(status, OK);
1217 ASSERT_TRUE(result.size() == devices.size());
1218 for (const auto &pair : result) {
1219 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1220 EXPECT_TRUE(pair.second == OK);
1221 }
1222
1223 VirtualDataItem item1;
1224 g_deviceB->GetData(KEY_1, item1);
1225 EXPECT_TRUE(item1.value == VALUE_1);
1226 g_deviceB->GetData(KEY_2, item1);
1227 EXPECT_TRUE(item1.value == VALUE_2);
1228
1229 VirtualDataItem item2;
1230 g_deviceC->GetData(KEY_1, item2);
1231 EXPECT_TRUE(item2.value == VALUE_1);
1232 g_deviceC->GetData(KEY_3, item2);
1233 EXPECT_TRUE(item2.value == VALUE_3);
1234
1235 Value value3;
1236 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == OK);
1237 EXPECT_TRUE(value3 == VALUE_1);
1238 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == OK);
1239 EXPECT_TRUE(value3 == VALUE_2);
1240 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_3, value3) == OK);
1241 EXPECT_TRUE(value3 == VALUE_3);
1242 }
1243
1244 /**
1245 * @tc.name: Block Sync 004
1246 * @tc.desc: Verify block sync function invalid args.
1247 * @tc.type: FUNC
1248 * @tc.require:
1249 * @tc.author: xushaohua
1250 */
1251 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync004, TestSize.Level2)
1252 {
1253 std::vector<std::string> devices;
1254
1255 /**
1256 * @tc.steps: step1. deviceA put {k1, v1}
1257 */
1258 g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1259
1260 /**
1261 * @tc.steps: step2. deviceA call block push sync to deviceB & deviceC.
1262 * @tc.expected: step2. Sync return INVALID_ARGS
1263 */
1264 std::map<std::string, DBStatus> result;
1265 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1266 EXPECT_EQ(status, INVALID_ARGS);
1267
1268 /**
1269 * @tc.steps: step3. deviceB, deviceC offlinem and push deviceA sync to deviceB and deviceC.
1270 * @tc.expected: step3. Sync return OK, but the deviceB and deviceC are TIME_OUT
1271 */
1272 devices.push_back(g_deviceB->GetDeviceId());
1273 devices.push_back(g_deviceC->GetDeviceId());
1274 g_deviceB->Offline();
1275 g_deviceC->Offline();
1276
1277 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1278 EXPECT_EQ(status, OK);
1279 ASSERT_TRUE(result.size() == devices.size());
1280 for (const auto &pair : result) {
1281 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1282 // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
1283 // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
1284 // The returned status is COMM_FAILURE
1285 EXPECT_TRUE((pair.second == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
1286 (pair.second == COMM_FAILURE));
1287 }
1288 }
1289
1290 /**
1291 * @tc.name: Block Sync 005
1292 * @tc.desc: Verify block sync function busy.
1293 * @tc.type: FUNC
1294 * @tc.require:
1295 * @tc.author: xushaohua
1296 */
1297 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync005, TestSize.Level2)
1298 {
1299 std::vector<std::string> devices;
1300 devices.push_back(g_deviceB->GetDeviceId());
1301 devices.push_back(g_deviceC->GetDeviceId());
1302 /**
1303 * @tc.steps: step1. deviceA put {k1, v1}
1304 */
1305 g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1306
1307 /**
1308 * @tc.steps: step2. New a thread to deviceA call block push sync to deviceB & deviceC,
1309 * but deviceB & C is blocked
1310 * @tc.expected: step2. Sync will be blocked util timeout, and then return OK
1311 */
1312 g_deviceB->Offline();
1313 g_deviceC->Offline();
__anon2fced0530502() 1314 thread thread([devices]() {
1315 std::map<std::string, DBStatus> resultInner;
1316 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1317 EXPECT_EQ(status, OK);
1318 });
1319 thread.detach();
1320 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1321 /**
1322 * @tc.steps: step3. sleep 1s and call sync.
1323 * @tc.expected: step3. Sync will return BUSY.
1324 */
1325 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1326 std::map<std::string, DBStatus> result;
1327 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, true);
1328 EXPECT_EQ(status, OK);
1329 }
1330
1331 /**
1332 * @tc.name: SyncQueue001
1333 * @tc.desc: Invalid args check of Pragma GET_QUEUED_SYNC_SIZE SET_QUEUED_SYNC_LIMIT and
1334 * GET_QUEUED_SYNC_LIMIT, expect return INVALID_ARGS.
1335 * @tc.type: FUNC
1336 * @tc.require:
1337 * @tc.author: wangchuanqing
1338 */
1339 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue001, TestSize.Level3)
1340 {
1341 /**
1342 * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE, and set param to be null
1343 * @tc.expected: step1. Expect return INVALID_ARGS.
1344 */
1345 int *param = nullptr;
1346 PragmaData input = static_cast<PragmaData>(param);
1347 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), INVALID_ARGS);
1348
1349 /**
1350 * @tc.steps:step2. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be null
1351 * @tc.expected: step2. Expect return INVALID_ARGS.
1352 */
1353 input = static_cast<PragmaData>(param);
1354 EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1355
1356 /**
1357 * @tc.steps:step3. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT, and set param to be null
1358 * @tc.expected: step3. Expect return INVALID_ARGS.
1359 */
1360 input = static_cast<PragmaData>(param);
1361 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1362
1363 /**
1364 * @tc.steps:step4. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be QUEUED_SYNC_LIMIT_MIN - 1
1365 * @tc.expected: step4. Expect return INVALID_ARGS.
1366 */
1367 int limit = DBConstant::QUEUED_SYNC_LIMIT_MIN - 1;
1368 input = static_cast<PragmaData>(&limit);
1369 EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1370
1371 /**
1372 * @tc.steps:step5. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be QUEUED_SYNC_LIMIT_MAX + 1
1373 * @tc.expected: step5. Expect return INVALID_ARGS.
1374 */
1375 limit = DBConstant::QUEUED_SYNC_LIMIT_MAX + 1;
1376 input = static_cast<PragmaData>(&limit);
1377 EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1378 }
1379
1380 /**
1381 * @tc.name: SyncQueue002
1382 * @tc.desc: Pragma GET_QUEUED_SYNC_LIMIT and SET_QUEUED_SYNC_LIMIT
1383 * @tc.type: FUNC
1384 * @tc.require:
1385 * @tc.author: wangchuanqing
1386 */
1387 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue002, TestSize.Level3)
1388 {
1389 /**
1390 * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT,
1391 * @tc.expected: step1. Expect return OK, limit eq QUEUED_SYNC_LIMIT_DEFAULT.
1392 */
1393 int limit = 0;
1394 PragmaData input = static_cast<PragmaData>(&limit);
1395 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), OK);
1396 EXPECT_EQ(limit, DBConstant::QUEUED_SYNC_LIMIT_DEFAULT);
1397
1398 /**
1399 * @tc.steps:step2. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be 50
1400 * @tc.expected: step2. Expect return OK.
1401 */
1402 limit = 50;
1403 input = static_cast<PragmaData>(&limit);
1404 EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), OK);
1405
1406 /**
1407 * @tc.steps:step3. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT,
1408 * @tc.expected: step3. Expect return OK, limit eq 50
1409 */
1410 limit = 0;
1411 input = static_cast<PragmaData>(&limit);
1412 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), OK);
1413 EXPECT_EQ(limit, 50);
1414 }
1415
1416 /**
1417 * @tc.name: SyncQueue003
1418 * @tc.desc: sync queue test
1419 * @tc.type: FUNC
1420 * @tc.require:
1421 * @tc.author: wangchuanqing
1422 */
1423 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue003, TestSize.Level3)
1424 {
1425 DBStatus status = OK;
1426 std::vector<std::string> devices;
1427 devices.push_back(g_deviceB->GetDeviceId());
1428 devices.push_back(g_deviceC->GetDeviceId());
1429
1430 /**
1431 * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1432 * @tc.expected: step1. Expect return OK, size eq 0.
1433 */
1434 int size;
1435 PragmaData input = static_cast<PragmaData>(&size);
1436 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1437 EXPECT_EQ(size, 0);
1438
1439 /**
1440 * @tc.steps:step2. deviceA put {k1, v1}
1441 */
1442 status = g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1443 ASSERT_TRUE(status == OK);
1444
1445 /**
1446 * @tc.steps:step3. deviceA sync SYNC_MODE_PUSH_ONLY
1447 */
1448 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1449 ASSERT_TRUE(status == OK);
1450
1451 /**
1452 * @tc.steps:step4. deviceA put {k2, v2}
1453 */
1454 status = g_kvDelegatePtr->Put(KEY_2, VALUE_2);
1455 ASSERT_TRUE(status == OK);
1456
1457 /**
1458 * @tc.steps:step5. deviceA sync SYNC_MODE_PUSH_ONLY
1459 */
1460 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1461 ASSERT_TRUE(status == OK);
1462
1463 /**
1464 * @tc.steps:step6. deviceB put {k3, v3}
1465 */
1466 g_deviceB->PutData(KEY_3, VALUE_3, 0, 0);
1467
1468 /**
1469 * @tc.steps:step7. deviceA put {k4, v4}
1470 */
1471 status = g_kvDelegatePtr->Put(KEY_4, VALUE_4);
1472 ASSERT_TRUE(status == OK);
1473
1474 /**
1475 * @tc.steps:step8. deviceA sync SYNC_MODE_PUSH_PULL
1476 */
1477 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_PULL, nullptr, false);
1478 ASSERT_TRUE(status == OK);
1479
1480 /**
1481 * @tc.steps:step9. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1482 * @tc.expected: step1. Expect return OK, 0 <= size <= 4
1483 */
1484 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1485 ASSERT_TRUE((size >= 0) && (size <= 4));
1486
1487 /**
1488 * @tc.steps:step10. deviceB put {k5, v5}
1489 */
1490 g_deviceB->PutData(KEY_5, VALUE_5, 0, 0);
1491
1492 /**
1493 * @tc.steps:step11. deviceA call sync and wait
1494 * @tc.expected: step11. sync should return OK.
1495 */
1496 std::map<std::string, DBStatus> result;
1497 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
1498 ASSERT_TRUE(status == OK);
1499
1500 /**
1501 * @tc.expected: step11. onComplete should be called, DeviceA,B,C have {k1,v1}~ {KEY_5,VALUE_5}
1502 */
1503 ASSERT_TRUE(result.size() == devices.size());
1504 for (const auto &pair : result) {
1505 EXPECT_TRUE(pair.second == OK);
1506 }
1507 VirtualDataItem item;
1508 g_deviceB->GetData(KEY_1, item);
1509 EXPECT_TRUE(item.value == VALUE_1);
1510 g_deviceB->GetData(KEY_2, item);
1511 EXPECT_TRUE(item.value == VALUE_2);
1512 g_deviceB->GetData(KEY_3, item);
1513 EXPECT_TRUE(item.value == VALUE_3);
1514 g_deviceB->GetData(KEY_4, item);
1515 EXPECT_TRUE(item.value == VALUE_4);
1516 g_deviceB->GetData(KEY_5, item);
1517 EXPECT_TRUE(item.value == VALUE_5);
1518 Value value;
1519 EXPECT_EQ(g_kvDelegatePtr->Get(KEY_3, value), OK);
1520 EXPECT_EQ(VALUE_3, value);
1521 EXPECT_EQ(g_kvDelegatePtr->Get(KEY_5, value), OK);
1522 EXPECT_EQ(VALUE_5, value);
1523 }
1524
1525 /**
1526 * @tc.name: SyncQueue004
1527 * @tc.desc: sync queue full test
1528 * @tc.type: FUNC
1529 * @tc.require:
1530 * @tc.author: wangchuanqing
1531 */
1532 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue004, TestSize.Level3)
1533 {
1534 DBStatus status = OK;
1535 std::vector<std::string> devices;
1536 devices.push_back(g_deviceB->GetDeviceId());
1537 devices.push_back(g_deviceC->GetDeviceId());
1538
1539 /**
1540 * @tc.steps:step1. deviceB C block
1541 */
1542 g_communicatorAggregator->SetBlockValue(true);
1543
1544 /**
1545 * @tc.steps:step2. deviceA put {k1, v1}
1546 */
1547 status = g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1548 ASSERT_TRUE(status == OK);
1549
1550 /**
1551 * @tc.steps:step3. deviceA sync QUEUED_SYNC_LIMIT_DEFAULT times
1552 * @tc.expected: step3. Expect return OK
1553 */
1554 for (int i = 0; i < DBConstant::QUEUED_SYNC_LIMIT_DEFAULT; i++) {
1555 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1556 ASSERT_TRUE(status == OK);
1557 }
1558
1559 /**
1560 * @tc.steps:step4. deviceA sync
1561 * @tc.expected: step4. Expect return BUSY
1562 */
1563 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1564 ASSERT_TRUE(status == BUSY);
1565 g_communicatorAggregator->SetBlockValue(false);
1566 }
1567
1568 /**
1569 * @tc.name: SyncQueue005
1570 * @tc.desc: block sync queue test
1571 * @tc.type: FUNC
1572 * @tc.require:
1573 * @tc.author: wangchuanqing
1574 */
1575 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue005, TestSize.Level3)
1576 {
1577 std::vector<std::string> devices;
1578 devices.push_back(g_deviceB->GetDeviceId());
1579 devices.push_back(g_deviceC->GetDeviceId());
1580 /**
1581 * @tc.steps:step1. New a thread to deviceA call block push sync to deviceB & deviceC,
1582 * but deviceB & C is offline
1583 * @tc.expected: step1. Sync will be blocked util timeout, and then return OK
1584 */
1585 g_deviceB->Offline();
1586 g_deviceC->Offline();
1587 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1588
1589 /**
1590 * @tc.steps:step2. deviceA put {k1, v1}
1591 */
1592 g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1593
1594 std::mutex lockMutex;
1595 std::condition_variable conditionVar;
1596
__anon2fced0530602() 1597 std::thread threadFirst([devices]() {
1598 std::map<std::string, DBStatus> resultInner;
1599 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1600 EXPECT_EQ(status, OK);
1601 });
1602 threadFirst.detach();
1603 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1604 /**
1605 * @tc.steps:step3. New a thread to deviceA call block push sync to deviceB & deviceC,
1606 * but deviceB & C is offline
1607 * @tc.expected: step2. Sync will be blocked util timeout, and then return OK
1608 */
__anon2fced0530702() 1609 std::thread threadSecond([devices, &lockMutex, &conditionVar]() {
1610 std::map<std::string, DBStatus> resultInner;
1611 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1612 EXPECT_EQ(status, OK);
1613 std::unique_lock<mutex> lockInner(lockMutex);
1614 conditionVar.notify_one();
1615 });
1616 threadSecond.detach();
1617
1618 /**
1619 * @tc.steps:step4. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1620 * @tc.expected: step1. Expect return OK, size eq 0.
1621 */
1622 int size;
1623 PragmaData input = static_cast<PragmaData>(&size);
1624 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1625 EXPECT_EQ(size, 0);
1626
1627 /**
1628 * @tc.steps:step5. wait exit
1629 */
1630 std::unique_lock<mutex> lock(lockMutex);
1631 auto now = std::chrono::system_clock::now();
1632 conditionVar.wait_until(lock, now + 2 * INT8_MAX * 1000ms);
1633 }
1634
1635 /**
1636 * @tc.name: CalculateSyncData001
1637 * @tc.desc: Test sync data whose device never synced before
1638 * @tc.type: FUNC
1639 * @tc.require:
1640 * @tc.author: zhuwentao
1641 */
1642 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData001, TestSize.Level3)
1643 {
1644 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1645 size_t dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
1646 uint32_t serialHeadLen = 8u;
1647 EXPECT_EQ(static_cast<uint32_t>(dataSize), 0u + serialHeadLen);
1648 uint32_t keySize = 256u;
1649 uint32_t valuesize = 1024u;
1650 uint32_t itemCount = 10u;
1651 CalculateDataTest(itemCount, keySize, valuesize);
1652 }
1653
1654 /**
1655 * @tc.name: CalculateSyncData002
1656 * @tc.desc: Test sync data whose device synced before, but sync data is less than 1M
1657 * @tc.type: FUNC
1658 * @tc.require:
1659 * @tc.author: zhuwentao
1660 */
1661 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData002, TestSize.Level3)
1662 {
1663 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1664 Key key1 = {'1'};
1665 Value value1 = {'1'};
1666 EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
1667
1668 std::vector<std::string> devices;
1669 devices.push_back(g_deviceB->GetDeviceId());
1670 std::map<std::string, DBStatus> result;
1671 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1672 ASSERT_TRUE(status == OK);
1673 ASSERT_TRUE(result.size() == devices.size());
1674 for (const auto &pair : result) {
1675 EXPECT_TRUE(pair.second == OK);
1676 }
1677
1678 uint32_t keySize = 256u;
1679 uint32_t valuesize = 512u;
1680 uint32_t itemCount = 20u;
1681 CalculateDataTest(itemCount, keySize, valuesize);
1682 }
1683
1684 /**
1685 * @tc.name: CalculateSyncData003
1686 * @tc.desc: Test sync data whose device synced before, but sync data is larger than 1M
1687 * @tc.type: FUNC
1688 * @tc.require:
1689 * @tc.author: zhuwentao
1690 */
1691 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData003, TestSize.Level3)
1692 {
1693 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1694 Key key1 = {'1'};
1695 Value value1 = {'1'};
1696 EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
1697
1698 std::vector<std::string> devices;
1699 devices.push_back(g_deviceB->GetDeviceId());
1700 std::map<std::string, DBStatus> result;
1701 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1702 ASSERT_TRUE(status == OK);
1703 ASSERT_TRUE(result.size() == devices.size());
1704 for (const auto &pair : result) {
1705 EXPECT_TRUE(pair.second == OK);
1706 }
1707 uint32_t keySize = 256u;
1708 uint32_t valuesize = 1024u;
1709 uint32_t itemCount = 2048u;
1710 CalculateDataTest(itemCount, keySize, valuesize);
1711 }
1712
1713 /**
1714 * @tc.name: CalculateSyncData004
1715 * @tc.desc: Test invalid device when call GetSyncDataSize interface
1716 * @tc.type: FUNC
1717 * @tc.require:
1718 * @tc.author: zhuwentao
1719 */
1720 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData004, TestSize.Level3)
1721 {
1722 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1723 std::string device;
1724 EXPECT_EQ(g_kvDelegatePtr->GetSyncDataSize(device), 0u);
1725 }
1726
1727 /**
1728 * @tc.name: CalculateSyncData005
1729 * @tc.desc: Test CalculateSyncData and rekey Concurrently
1730 * @tc.type: FUNC
1731 * @tc.require:
1732 * @tc.author: zhuwentao
1733 */
1734 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData005, TestSize.Level3)
1735 {
1736 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1737 size_t dataSize = 0;
1738 Key key1 = {'1'};
1739 Value value1 = {'1'};
1740 EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
__anon2fced0530802() 1741 std::thread thread1([]() {
1742 std::this_thread::sleep_for(std::chrono::milliseconds(1));
1743 CipherPassword passwd; // random password
1744 vector<uint8_t> passwdBuffer(10, 45); // 10 and 45 as random password.
1745 passwd.SetValue(passwdBuffer.data(), passwdBuffer.size());
1746 g_kvDelegatePtr->Rekey(passwd);
1747 });
__anon2fced0530902() 1748 std::thread thread2([&dataSize, &key1, &value1]() {
1749 dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
1750 if (dataSize > 0) {
1751 uint32_t expectedDataSize = (key1.size() + value1.size());
1752 uint32_t externalSize = 70u;
1753 uint32_t serialHeadLen = 8u;
1754 ASSERT_GE(static_cast<uint32_t>(dataSize), expectedDataSize);
1755 ASSERT_LE(static_cast<uint32_t>(dataSize), serialHeadLen + expectedDataSize + externalSize);
1756 }
1757 });
1758 thread1.join();
1759 thread2.join();
1760 }
1761
1762 /**
1763 * @tc.name: GetWaterMarkInfo001
1764 * @tc.desc: Test invalid dev for get water mark info.
1765 * @tc.type: FUNC
1766 * @tc.require:
1767 * @tc.author: zhangqiquan
1768 */
1769 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, GetWaterMarkInfo001, TestSize.Level0)
1770 {
1771 std::string dev;
1772 auto res = g_kvDelegatePtr->GetWatermarkInfo(dev);
1773 EXPECT_EQ(res.first, INVALID_ARGS);
1774 EXPECT_EQ(res.second.sendMark, 0u);
1775 EXPECT_EQ(res.second.receiveMark, 0u);
1776
1777 dev = std::string(DBConstant::MAX_DEV_LENGTH + 1, 'a');
1778 res = g_kvDelegatePtr->GetWatermarkInfo(dev);
1779 EXPECT_EQ(res.first, INVALID_ARGS);
1780 EXPECT_EQ(res.second.sendMark, 0u);
1781 EXPECT_EQ(res.second.receiveMark, 0u);
1782 }
1783
1784 /**
1785 * @tc.name: QuerySyncRetry001
1786 * @tc.desc: use sync retry sync use query pull by compress
1787 * @tc.type: FUNC
1788 * @tc.require:
1789 * @tc.author: zhangqiquan
1790 */
1791 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, QuerySyncRetry001, TestSize.Level3)
1792 {
1793 if (g_kvDelegatePtr != nullptr) {
1794 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1795 g_kvDelegatePtr = nullptr;
1796 }
1797 /**
1798 * @tc.steps: step1. open db use Compress
1799 * @tc.expected: step1, Pragma return OK.
1800 */
1801 KvStoreNbDelegate::Option option;
1802 option.isNeedCompressOnSync = true;
1803 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1804 ASSERT_TRUE(g_kvDelegateStatus == OK);
1805 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1806
1807 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, QUERY_SYNC_MESSAGE);
1808 std::vector<std::string> devices;
1809 devices.push_back(g_deviceB->GetDeviceId());
1810
1811 /**
1812 * @tc.steps: step2. set sync retry
1813 * @tc.expected: step2, Pragma return OK.
1814 */
1815 int pragmaData = 1;
1816 PragmaData input = static_cast<PragmaData>(&pragmaData);
1817 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1818
1819 /**
1820 * @tc.steps: step3. deviceA call sync and wait
1821 * @tc.expected: step3. sync should return OK.
1822 */
1823 std::map<std::string, DBStatus> result;
1824 auto query = Query::Select().PrefixKey({});
1825 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query) == OK);
1826
1827 /**
1828 * @tc.expected: step4. onComplete should be called, and status is time_out
1829 */
1830 ASSERT_TRUE(result.size() == devices.size());
1831 for (const auto &pair : result) {
1832 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1833 EXPECT_EQ(pair.second, OK);
1834 }
1835 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1836 }
1837 }