1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <condition_variable>
17 #include <gtest/gtest.h>
18 #include <thread>
19
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "platform_specific.h"
27 #include "single_ver_data_sync.h"
28 #include "single_ver_kv_sync_task_context.h"
29
30 using namespace testing::ext;
31 using namespace DistributedDB;
32 using namespace DistributedDBUnitTest;
33 using namespace std;
34
35 namespace {
36 string g_testDir;
37 const string STORE_ID = "kv_stroe_sync_test";
38 const int64_t TIME_OFFSET = 5000000;
39 const int WAIT_TIME = 1000;
40 const std::string DEVICE_A = "real_device";
41 const std::string DEVICE_B = "deviceB";
42 const std::string DEVICE_C = "deviceC";
43
44 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
45 KvStoreConfig g_config;
46 DistributedDBToolsUnitTest g_tool;
47 DBStatus g_kvDelegateStatus = INVALID_ARGS;
48 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
49 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
50 KvVirtualDevice *g_deviceB = nullptr;
51 KvVirtualDevice *g_deviceC = nullptr;
52
53 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
54 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
55 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
56
CalculateDataTest(uint32_t itemCount,uint32_t keySize,uint32_t valueSize)57 void CalculateDataTest(uint32_t itemCount, uint32_t keySize, uint32_t valueSize)
58 {
59 for (uint32_t i = 0; i < itemCount; i++) {
60 std::vector<uint8_t> prefixKey = {'a', 'b', 'c'};
61 Key key = DistributedDBToolsUnitTest::GetRandPrefixKey(prefixKey, keySize);
62 Value value;
63 DistributedDBToolsUnitTest::GetRandomKeyValue(value, valueSize);
64 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
65 }
66 size_t dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
67 uint32_t expectedDataSize = (valueSize + keySize);
68 uint32_t externalSize = 70u;
69 uint32_t serialHeadLen = 8u;
70 LOGI("expectedDataSize=%u, v=%u", expectedDataSize, externalSize);
71 uint32_t maxDataSize = 1024u * 1024u;
72 if (itemCount * expectedDataSize >= maxDataSize) {
73 EXPECT_EQ(static_cast<uint32_t>(dataSize), maxDataSize);
74 return;
75 }
76 ASSERT_GE(static_cast<uint32_t>(dataSize), itemCount * expectedDataSize);
77 ASSERT_LE(static_cast<uint32_t>(dataSize), serialHeadLen + itemCount * (expectedDataSize + externalSize));
78 }
79 }
80
81 class DistributedDBSingleVerP2PSimpleSyncTest : public testing::Test {
82 public:
83 static void SetUpTestCase(void);
84 static void TearDownTestCase(void);
85 void SetUp();
86 void TearDown();
87 };
88
SetUpTestCase(void)89 void DistributedDBSingleVerP2PSimpleSyncTest::SetUpTestCase(void)
90 {
91 /**
92 * @tc.setup: Init datadir and Virtual Communicator.
93 */
94 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
95 g_config.dataDir = g_testDir;
96 g_mgr.SetKvStoreConfig(g_config);
97
98 string dir = g_testDir + "/single_ver";
99 DIR* dirTmp = opendir(dir.c_str());
100 if (dirTmp == nullptr) {
101 OS::MakeDBDirectory(dir);
102 } else {
103 closedir(dirTmp);
104 }
105
106 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
107 ASSERT_TRUE(g_communicatorAggregator != nullptr);
108 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
109 }
110
TearDownTestCase(void)111 void DistributedDBSingleVerP2PSimpleSyncTest::TearDownTestCase(void)
112 {
113 /**
114 * @tc.teardown: Release virtual Communicator and clear data dir.
115 */
116 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
117 LOGE("rm test db files error!");
118 }
119 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
120 }
121
SetUp(void)122 void DistributedDBSingleVerP2PSimpleSyncTest::SetUp(void)
123 {
124 DistributedDBToolsUnitTest::PrintTestCaseInfo();
125 /**
126 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
127 */
128 KvStoreNbDelegate::Option option;
129 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
130 ASSERT_TRUE(g_kvDelegateStatus == OK);
131 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
132 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
133 ASSERT_TRUE(g_deviceB != nullptr);
134 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
135 ASSERT_TRUE(syncInterfaceB != nullptr);
136 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
137
138 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
139 ASSERT_TRUE(g_deviceC != nullptr);
140 VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
141 ASSERT_TRUE(syncInterfaceC != nullptr);
142 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
143
144 auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
145 const std::string &deviceId, uint8_t flag) -> bool {
146 return true;
147 };
148 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
149 }
150
TearDown(void)151 void DistributedDBSingleVerP2PSimpleSyncTest::TearDown(void)
152 {
153 /**
154 * @tc.teardown: Release device A, B, C
155 */
156 if (g_kvDelegatePtr != nullptr) {
157 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
158 g_kvDelegatePtr = nullptr;
159 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
160 LOGD("delete kv store status %d", status);
161 ASSERT_TRUE(status == OK);
162 }
163 if (g_deviceB != nullptr) {
164 delete g_deviceB;
165 g_deviceB = nullptr;
166 }
167 if (g_deviceC != nullptr) {
168 delete g_deviceC;
169 g_deviceC = nullptr;
170 }
171 PermissionCheckCallbackV2 nullCallback;
172 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
173 }
174
175 /**
176 * @tc.name: Normal Sync 001
177 * @tc.desc: Test normal push sync for add data.
178 * @tc.type: FUNC
179 * @tc.require: AR000CQS3S SR000CQE0B
180 * @tc.author: xushaohua
181 */
182 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync001, TestSize.Level1)
183 {
184 DBStatus status = OK;
185 std::vector<std::string> devices;
186 devices.push_back(g_deviceB->GetDeviceId());
187 devices.push_back(g_deviceC->GetDeviceId());
188
189 /**
190 * @tc.steps: step1. deviceA put {k1, v1}
191 */
192 Key key = {'1'};
193 Value value = {'1'};
194 status = g_kvDelegatePtr->Put(key, value);
195 ASSERT_TRUE(status == OK);
196
197 /**
198 * @tc.steps: step2. deviceA call sync and wait
199 * @tc.expected: step2. sync should return OK.
200 */
201 std::map<std::string, DBStatus> result;
202 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
203 ASSERT_TRUE(status == OK);
204
205 /**
206 * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
207 */
208 ASSERT_TRUE(result.size() == devices.size());
209 for (const auto &pair : result) {
210 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
211 EXPECT_TRUE(pair.second == OK);
212 }
213 VirtualDataItem item;
214 g_deviceB->GetData(key, item);
215 EXPECT_TRUE(item.value == value);
216 g_deviceC->GetData(key, item);
217 EXPECT_TRUE(item.value == value);
218 }
219
220 /**
221 * @tc.name: Normal Sync 002
222 * @tc.desc: Test normal push sync for update data.
223 * @tc.type: FUNC
224 * @tc.require: AR000CCPOM
225 * @tc.author: xushaohua
226 */
227 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync002, TestSize.Level1)
228 {
229 DBStatus status = OK;
230 std::vector<std::string> devices;
231 devices.push_back(g_deviceB->GetDeviceId());
232 devices.push_back(g_deviceC->GetDeviceId());
233
234 /**
235 * @tc.steps: step1. deviceA put {k1, v1}
236 */
237 Key key = {'1'};
238 Value value = {'1'};
239 status = g_kvDelegatePtr->Put(key, value);
240 ASSERT_TRUE(status == OK);
241
242 /**
243 * @tc.steps: step2. deviceA put {k1, v2}
244 */
245 Value value2;
246 value2.push_back('2');
247 status = g_kvDelegatePtr->Put(key, value2);
248 ASSERT_TRUE(status == OK);
249
250 /**
251 * @tc.steps: step3. deviceA call sync and wait
252 * @tc.expected: step3. sync should return OK.
253 */
254 std::map<std::string, DBStatus> result;
255 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
256 ASSERT_TRUE(status == OK);
257
258 /**
259 * @tc.expected: step3. onComplete should be called, DeviceB,C have {k1,v2}
260 */
261 ASSERT_TRUE(result.size() == devices.size());
262 for (const auto &pair : result) {
263 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
264 EXPECT_TRUE(pair.second == OK);
265 }
266 VirtualDataItem item;
267 g_deviceC->GetData(key, item);
268 EXPECT_TRUE(item.value == value2);
269 g_deviceB->GetData(key, item);
270 EXPECT_TRUE(item.value == value2);
271 }
272
273 /**
274 * @tc.name: Normal Sync 003
275 * @tc.desc: Test normal push sync for delete data.
276 * @tc.type: FUNC
277 * @tc.require: AR000CQS3S
278 * @tc.author: xushaohua
279 */
280 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync003, TestSize.Level1)
281 {
282 DBStatus status = OK;
283 std::vector<std::string> devices;
284 devices.push_back(g_deviceB->GetDeviceId());
285 devices.push_back(g_deviceC->GetDeviceId());
286
287 /**
288 * @tc.steps: step1. deviceA put {k1, v1}
289 */
290 Key key = {'1'};
291 Value value = {'1'};
292 status = g_kvDelegatePtr->Put(key, value);
293 ASSERT_TRUE(status == OK);
294
295 /**
296 * @tc.steps: step2. deviceA delete k1
297 */
298 status = g_kvDelegatePtr->Delete(key);
299 ASSERT_TRUE(status == OK);
300 std::map<std::string, DBStatus> result;
301 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
302 ASSERT_TRUE(status == OK);
303
304 /**
305 * @tc.steps: step3. deviceA call sync and wait
306 * @tc.expected: step3. sync should return OK.
307 */
308 ASSERT_TRUE(result.size() == devices.size());
309 for (const auto &pair : result) {
310 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
311 EXPECT_TRUE(pair.second == OK);
312 }
313
314 /**
315 * @tc.expected: step3. onComplete should be called, DeviceB,C have {k1, delete}
316 */
317 VirtualDataItem item;
318 Key hashKey;
319 DistributedDBToolsUnitTest::CalcHash(key, hashKey);
320 EXPECT_EQ(g_deviceB->GetData(hashKey, item), -E_NOT_FOUND);
321 EXPECT_EQ(g_deviceC->GetData(hashKey, item), -E_NOT_FOUND);
322 }
323
324 /**
325 * @tc.name: Normal Sync 004
326 * @tc.desc: Test normal pull sync for add data.
327 * @tc.type: FUNC
328 * @tc.require: AR000CCPOM
329 * @tc.author: xushaohua
330 */
331 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync004, TestSize.Level1)
332 {
333 DBStatus status = OK;
334 std::vector<std::string> devices;
335 devices.push_back(g_deviceB->GetDeviceId());
336 devices.push_back(g_deviceC->GetDeviceId());
337
338 /**
339 * @tc.steps: step1. deviceB put {k1, v1}
340 */
341 Key key = {'1'};
342 Value value = {'1'};
343 g_deviceB->PutData(key, value, 0, 0);
344
345 /**
346 * @tc.steps: step2. deviceB put {k2, v2}
347 */
348 Key key2 = {'2'};
349 Value value2 = {'2'};
350 g_deviceC->PutData(key2, value2, 0, 0);
351 ASSERT_TRUE(status == OK);
352
353 /**
354 * @tc.steps: step3. deviceA call pull sync
355 * @tc.expected: step3. sync should return OK.
356 */
357 std::map<std::string, DBStatus> result;
358 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
359 ASSERT_TRUE(status == OK);
360
361 /**
362 * @tc.expected: step3. onComplete should be called, DeviceA have {k1, VALUE_1}, {K2. VALUE_2}
363 */
364 ASSERT_TRUE(result.size() == devices.size());
365 for (const auto &pair : result) {
366 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
367 EXPECT_TRUE(pair.second == OK);
368 }
369 Value value3;
370 EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
371 EXPECT_EQ(value3, value);
372 EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
373 EXPECT_EQ(value3, value2);
374 }
375
376 /**
377 * @tc.name: Normal Sync 005
378 * @tc.desc: Test normal pull sync for update data.
379 * @tc.type: FUNC
380 * @tc.require: AR000CCPOM SR000CQE10
381 * @tc.author: xushaohua
382 */
383 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync005, TestSize.Level2)
384 {
385 DBStatus status = OK;
386 std::vector<std::string> devices;
387 devices.push_back(g_deviceB->GetDeviceId());
388 devices.push_back(g_deviceC->GetDeviceId());
389
390 /**
391 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
392 */
393 Key key1 = {'1'};
394 Value value1 = {'1'};
395 status = g_kvDelegatePtr->Put(key1, value1);
396 ASSERT_TRUE(status == OK);
397 Key key2 = {'2'};
398 Value value2 = {'2'};
399 status = g_kvDelegatePtr->Put(key2, value2);
400 ASSERT_TRUE(status == OK);
401
402 /**
403 * @tc.steps: step2. deviceB put {k1, v3} t2, t2 > t1
404 */
405 Value value3;
406 value3.push_back('3');
407 g_deviceB->PutData(key1, value3,
408 TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 0);
409
410 /**
411 * @tc.steps: step3. deviceC put {k2, v4} t2, t4 < t1
412 */
413 Value value4;
414 value4.push_back('4');
415 g_deviceC->PutData(key2, value4,
416 TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
417
418 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
419 /**
420 * @tc.steps: step4. deviceA call pull sync
421 * @tc.expected: step4. sync should return OK.
422 */
423 std::map<std::string, DBStatus> result;
424 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
425 ASSERT_TRUE(status == OK);
426
427 /**
428 * @tc.expected: step4. onComplete should be called, DeviceA have {k1, v3}, {k2. v2}
429 */
430 ASSERT_TRUE(result.size() == devices.size());
431 for (const auto &pair : result) {
432 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
433 EXPECT_TRUE(pair.second == OK);
434 }
435
436 Value value5;
437 g_kvDelegatePtr->Get(key1, value5);
438 EXPECT_TRUE(value5 == value3);
439 g_kvDelegatePtr->Get(key2, value5);
440 EXPECT_TRUE(value5 == value2);
441 }
442
443 /**
444 * @tc.name: Normal Sync 006
445 * @tc.desc: Test normal pull sync for delete data.
446 * @tc.type: FUNC
447 * @tc.require: AR000CQS3S
448 * @tc.author: xushaohua
449 */
450 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync006, TestSize.Level2)
451 {
452 /**
453 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
454 */
455 Key key1 = {'1'};
456 Value value1 = {'1'};
457 DBStatus status = g_kvDelegatePtr->Put(key1, value1);
458 ASSERT_TRUE(status == OK);
459 Key key2 = {'2'};
460 Value value2 = {'2'};
461 status = g_kvDelegatePtr->Put(key2, value2);
462 ASSERT_TRUE(status == OK);
463
464 /**
465 * @tc.steps: step2. deviceA put {k1, delete} t2, t2 <t1
466 */
467 Key hashKey1;
468 DistributedDBToolsUnitTest::CalcHash(key1, hashKey1);
469 g_deviceB->PutData(hashKey1, value1,
470 TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 1);
471
472 /**
473 * @tc.steps: step3. deviceA put {k1, delete} t3, t3 < t1
474 */
475 Key hashKey2;
476 DistributedDBToolsUnitTest::CalcHash(key2, hashKey2);
477 g_deviceC->PutData(hashKey2, value1,
478 TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
479
480 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
481 /**
482 * @tc.steps: step4. deviceA call pull sync
483 * @tc.expected: step4. sync should return OK.
484 */
485 std::map<std::string, DBStatus> result;
486 std::vector<std::string> devices;
487 devices.push_back(g_deviceB->GetDeviceId());
488 devices.push_back(g_deviceC->GetDeviceId());
489 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
490 ASSERT_TRUE(status == OK);
491
492 /**
493 * @tc.expected: step4. onComplete should be called, DeviceA have {k2. v2} don't have k1
494 */
495 ASSERT_TRUE(result.size() == devices.size());
496 for (const auto &pair : result) {
497 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
498 EXPECT_TRUE(pair.second == OK);
499 }
500 Value value5;
501 g_kvDelegatePtr->Get(key1, value5);
502 EXPECT_TRUE(value5.empty());
503 g_kvDelegatePtr->Get(key2, value5);
504 EXPECT_TRUE(value5 == value2);
505 }
506
507 /**
508 * @tc.name: Normal Sync 007
509 * @tc.desc: Test normal push_pull sync for add data.
510 * @tc.type: FUNC
511 * @tc.require: AR000CCPOM
512 * @tc.author: xushaohua
513 */
514 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync007, TestSize.Level1)
515 {
516 DBStatus status = OK;
517 std::vector<std::string> devices;
518 devices.push_back(g_deviceB->GetDeviceId());
519 devices.push_back(g_deviceC->GetDeviceId());
520
521 /**
522 * @tc.steps: step1. deviceA put {k1, v1}
523 */
524 Key key1 = {'1'};
525 Value value1 = {'1'};
526 status = g_kvDelegatePtr->Put(key1, value1);
527 EXPECT_TRUE(status == OK);
528
529 /**
530 * @tc.steps: step1. deviceB put {k2, v2}
531 */
532 Key key2 = {'2'};
533 Value value2 = {'2'};
534 g_deviceB->PutData(key2, value2, 0, 0);
535
536 /**
537 * @tc.steps: step1. deviceB put {k3, v3}
538 */
539 Key key3 = {'3'};
540 Value value3 = {'3'};
541 g_deviceC->PutData(key3, value3, 0, 0);
542
543 /**
544 * @tc.steps: step4. deviceA call push_pull sync
545 * @tc.expected: step4. sync should return OK.
546 */
547 std::map<std::string, DBStatus> result;
548 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
549 ASSERT_TRUE(status == OK);
550
551 ASSERT_TRUE(result.size() == devices.size());
552 for (const auto &pair : result) {
553 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
554 EXPECT_TRUE(pair.second == OK);
555 }
556
557 /**
558 * @tc.expected: step4. onComplete should be called, DeviceA have {k1. v1}, {k2, v2}, {k3, v3}
559 * deviceB received {k1. v1}, don't received k3, deviceC received {k1. v1}, don't received k2
560 */
561 Value value4;
562 g_kvDelegatePtr->Get(key2, value4);
563 EXPECT_TRUE(value4 == value2);
564 g_kvDelegatePtr->Get(key3, value4);
565 EXPECT_TRUE(value4 == value3);
566
567 VirtualDataItem item1;
568 g_deviceB->GetData(key1, item1);
569 EXPECT_TRUE(item1.value == value1);
570 item1.value.clear();
571 g_deviceB->GetData(key3, item1);
572 EXPECT_TRUE(item1.value.empty());
573
574 VirtualDataItem item2;
575 g_deviceC->GetData(key1, item2);
576 EXPECT_TRUE(item2.value == value1);
577 item2.value.clear();
578 g_deviceC->GetData(key2, item2);
579 EXPECT_TRUE(item2.value.empty());
580 }
581
582 /**
583 * @tc.name: Normal Sync 008
584 * @tc.desc: Test normal push_pull sync for update data.
585 * @tc.type: FUNC
586 * @tc.require: AR000CCPOM
587 * @tc.author: xushaohua
588 */
589 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync008, TestSize.Level2)
590 {
591 DBStatus status = OK;
592 std::vector<std::string> devices;
593 devices.push_back(g_deviceB->GetDeviceId());
594 devices.push_back(g_deviceC->GetDeviceId());
595
596 /**
597 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
598 */
599 Key key1 = {'1'};
600 Value value1 = {'1'};
601 status = g_kvDelegatePtr->Put(key1, value1);
602 ASSERT_TRUE(status == OK);
603
604 Key key2 = {'2'};
605 Value value2 = {'2'};
606 status = g_kvDelegatePtr->Put(key2, value2);
607 ASSERT_TRUE(status == OK);
608
609 /**
610 * @tc.steps: step2. deviceB put {k1, v3} t2, t2 > t1
611 */
612 Value value3 = {'3'};
613 g_deviceB->PutData(key1, value3,
614 TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 0);
615
616 /**
617 * @tc.steps: step3. deviceB put {k1, v4} t3, t4 <t1
618 */
619 Value value4 = {'4'};
620 g_deviceC->PutData(key2, value4,
621 TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
622 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
623
624 /**
625 * @tc.steps: step4. deviceA call push_pull sync
626 * @tc.expected: step4. sync should return OK.
627 */
628 std::map<std::string, DBStatus> result;
629 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
630 ASSERT_TRUE(status == OK);
631 ASSERT_TRUE(result.size() == devices.size());
632 for (const auto &pair : result) {
633 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
634 EXPECT_TRUE(pair.second == OK);
635 }
636
637 /**
638 * @tc.expected: step4. onComplete should be called, DeviceA have {k1. v3}, {k2, v2}
639 * deviceB have {k1. v3}, deviceC have {k2. v2}
640 */
641 Value value5;
642 g_kvDelegatePtr->Get(key1, value5);
643 EXPECT_EQ(value5, value3);
644 g_kvDelegatePtr->Get(key2, value5);
645 EXPECT_EQ(value5, value2);
646
647 VirtualDataItem item1;
648 g_deviceB->GetData(key1, item1);
649 EXPECT_TRUE(item1.value == value3);
650 item1.value.clear();
651 g_deviceB->GetData(key2, item1);
652 EXPECT_TRUE(item1.value == value2);
653
654 VirtualDataItem item2;
655 g_deviceC->GetData(key2, item2);
656 EXPECT_TRUE(item2.value == value2);
657 }
658
659 /**
660 * @tc.name: Normal Sync 009
661 * @tc.desc: Test normal push_pull sync for delete data.
662 * @tc.type: FUNC
663 * @tc.require: AR000CCPOM
664 * @tc.author: xushaohua
665 */
666 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync009, TestSize.Level2)
667 {
668 DBStatus status = OK;
669 std::vector<std::string> devices;
670 devices.push_back(g_deviceB->GetDeviceId());
671 devices.push_back(g_deviceC->GetDeviceId());
672
673 /**
674 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
675 */
676 Key key1 = {'1'};
677 Value value1 = {'1'};
678 status = g_kvDelegatePtr->Put(key1, value1);
679 ASSERT_TRUE(status == OK);
680
681 Key key2 = {'2'};
682 Value value2 = {'2'};
683 status = g_kvDelegatePtr->Put(key2, value2);
684 ASSERT_TRUE(status == OK);
685
686 /**
687 * @tc.steps: step2. deviceB put {k1, delete} t2, t2 > t1
688 */
689 Key hashKey1;
690 DistributedDBToolsUnitTest::CalcHash(key1, hashKey1);
691 g_deviceB->PutData(hashKey1, value1,
692 TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 1);
693
694 /**
695 * @tc.steps: step3. deviceB put {k1, delete} t3, t2 < t1
696 */
697 Key hashKey2;
698 DistributedDBToolsUnitTest::CalcHash(key2, hashKey2);
699 g_deviceC->PutData(hashKey2, value2,
700 TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 1);
701
702 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
703 /**
704 * @tc.steps: step4. deviceA call push_pull sync
705 * @tc.expected: step4. sync should return OK.
706 */
707 std::map<std::string, DBStatus> result;
708 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
709 ASSERT_TRUE(status == OK);
710
711 /**
712 * @tc.expected: step4. onComplete should be called, DeviceA have {k1. delete}, {k2, v2}
713 * deviceB have {k2. v2}, deviceC have {k2. v2}
714 */
715 ASSERT_TRUE(result.size() == devices.size());
716 for (const auto &pair : result) {
717 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
718 EXPECT_TRUE(pair.second == OK);
719 }
720
721 Value value3;
722 g_kvDelegatePtr->Get(key1, value3);
723 EXPECT_TRUE(value3.empty());
724 value3.clear();
725 g_kvDelegatePtr->Get(key2, value3);
726 EXPECT_TRUE(value3 == value2);
727
728 VirtualDataItem item1;
729 g_deviceB->GetData(key2, item1);
730 EXPECT_TRUE(item1.value == value2);
731
732 VirtualDataItem item2;
733 g_deviceC->GetData(key2, item2);
734 EXPECT_TRUE(item2.value == value2);
735 }
736
737 /**
738 * @tc.name: Normal Sync 010
739 * @tc.desc: Test sync failed by invalid devices.
740 * @tc.type: FUNC
741 * @tc.require: AR000CCPOM
742 * @tc.author: zhangqiquan
743 */
744 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync010, TestSize.Level1)
745 {
746 DBStatus status = OK;
747 std::vector<std::string> devices;
748 std::string invalidDev = std::string(DBConstant::MAX_DEV_LENGTH + 1, '0');
749 devices.push_back(DEVICE_A);
750 devices.push_back(g_deviceB->GetDeviceId());
751 devices.push_back(invalidDev);
752
753 std::map<std::string, DBStatus> result;
754 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
755 ASSERT_TRUE(status == OK);
756
757 ASSERT_EQ(result.size(), devices.size());
758 EXPECT_EQ(result[DEVICE_A], INVALID_ARGS);
759 EXPECT_EQ(result[invalidDev], INVALID_ARGS);
760 EXPECT_EQ(result[DEVICE_B], OK);
761 }
762
763 /**
764 * @tc.name: Limit Data Sync 001
765 * @tc.desc: Test sync limit key and value data
766 * @tc.type: FUNC
767 * @tc.require: AR000CCPOM
768 * @tc.author: xushaohua
769 */
770 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, LimitDataSync001, TestSize.Level1)
771 {
772 DBStatus status = OK;
773 std::vector<std::string> devices;
774 devices.push_back(g_deviceB->GetDeviceId());
775
776 Key key1;
777 Value value1;
778 DistributedDBToolsUnitTest::GetRandomKeyValue(key1, DBConstant::MAX_KEY_SIZE + 1);
779 DistributedDBToolsUnitTest::GetRandomKeyValue(value1, DBConstant::MAX_VALUE_SIZE + 1);
780
781 Key key2;
782 Value value2;
783 DistributedDBToolsUnitTest::GetRandomKeyValue(key2, DBConstant::MAX_KEY_SIZE);
784 DistributedDBToolsUnitTest::GetRandomKeyValue(value2, DBConstant::MAX_VALUE_SIZE);
785
786 /**
787 * @tc.steps: step1. deviceB put {k1, v1}, K1 > 1k, v1 > 4M
788 */
789 g_deviceB->PutData(key1, value1, 0, 0);
790
791 /**
792 * @tc.steps: step2. deviceB put {k2, v2}, K2 = 1k, v2 = 4M
793 */
794 g_deviceC->PutData(key2, value2, 0, 0);
795
796 /**
797 * @tc.steps: step3. deviceA call pull sync from device B
798 * @tc.expected: step3. sync should return OK.
799 */
800 std::map<std::string, DBStatus> result;
801 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
802 ASSERT_TRUE(status == OK);
803
804 /**
805 * @tc.expected: step3. onComplete should be called.
806 */
807 ASSERT_TRUE(result.size() == devices.size());
808 for (const auto &pair : result) {
809 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
810 if (pair.first == g_deviceB->GetDeviceId()) {
811 EXPECT_TRUE(pair.second != OK);
812 } else {
813 EXPECT_TRUE(pair.second == OK);
814 }
815 }
816
817 /**
818 * @tc.steps: step4. deviceA call pull sync from deviceC
819 * @tc.expected: step4. sync should return OK.
820 */
821 devices.clear();
822 result.clear();
823 devices.push_back(g_deviceC->GetDeviceId());
824 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
825 ASSERT_TRUE(status == OK);
826
827 /**
828 * @tc.expected: step4. onComplete should be called, DeviceA have {k2. v2}, don't have {k1, v1}
829 */
830 ASSERT_TRUE(result.size() == devices.size());
831 for (const auto &pair : result) {
832 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
833 EXPECT_TRUE(pair.second == OK);
834 }
835
836 // Get value from A
837 Value valueRead;
838 EXPECT_TRUE(g_kvDelegatePtr->Get(key1, valueRead) != OK);
839 valueRead.clear();
840 EXPECT_EQ(g_kvDelegatePtr->Get(key2, valueRead), OK);
841 EXPECT_TRUE(valueRead == value2);
842 }
843
844 /**
845 * @tc.name: Auto Sync 001
846 * @tc.desc: Verify auto sync enable function.
847 * @tc.type: FUNC
848 * @tc.require: AR000CKRTD AR000CQE0E
849 * @tc.author: xushaohua
850 */
851 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, AutoSync001, TestSize.Level1)
852 {
853 std::vector<std::string> devices;
854 devices.push_back(g_deviceB->GetDeviceId());
855 devices.push_back(g_deviceC->GetDeviceId());
856
857 /**
858 * @tc.steps: step1. enable auto sync
859 * @tc.expected: step1, Pragma return OK.
860 */
861 bool autoSync = true;
862 PragmaData data = static_cast<PragmaData>(&autoSync);
863 DBStatus status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
864 ASSERT_EQ(status, OK);
865
866 /**
867 * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
868 */
869 ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
870 ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_2, VALUE_2) == OK);
871
872 /**
873 * @tc.steps: step3. sleep for data sync
874 * @tc.expected: step3. deviceB,C has {k1, v1}, {k2, v2}
875 */
876 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
877 VirtualDataItem item;
878 g_deviceB->GetData(KEY_1, item);
879 EXPECT_EQ(item.value, VALUE_1);
880 g_deviceB->GetData(KEY_2, item);
881 EXPECT_EQ(item.value, VALUE_2);
882 g_deviceC->GetData(KEY_1, item);
883 EXPECT_EQ(item.value, VALUE_1);
884 g_deviceC->GetData(KEY_2, item);
885 EXPECT_EQ(item.value, VALUE_2);
886 }
887
888 /**
889 * @tc.name: Auto Sync 002
890 * @tc.desc: Verify auto sync disable function.
891 * @tc.type: FUNC
892 * @tc.require: AR000CKRTD AR000CQE0E
893 * @tc.author: xushaohua
894 */
895 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, AutoSync002, TestSize.Level1)
896 {
897 std::vector<std::string> devices;
898 devices.push_back(g_deviceB->GetDeviceId());
899 devices.push_back(g_deviceC->GetDeviceId());
900
901 /**
902 * @tc.steps: step1. disable auto sync
903 * @tc.expected: step1, Pragma return OK.
904 */
905 bool autoSync = false;
906 PragmaData data = static_cast<PragmaData>(&autoSync);
907 DBStatus status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
908 ASSERT_EQ(status, OK);
909
910 /**
911 * @tc.steps: step2. deviceB put {k1, v1}, deviceC put {k2, v2}
912 */
913 g_deviceB->PutData(KEY_1, VALUE_1, 0, 0);
914 g_deviceC->PutData(KEY_2, VALUE_2, 0, 0);
915
916 /**
917 * @tc.steps: step3. sleep for data sync
918 * @tc.expected: step3. deviceA don't have k1, k2.
919 */
920 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
921 Value value3;
922 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == NOT_FOUND);
923 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == NOT_FOUND);
924 }
925
926 /**
927 * @tc.name: Block Sync 001
928 * @tc.desc: Verify block push sync function.
929 * @tc.type: FUNC
930 * @tc.require: AR000CKRTD AR000CQE0E
931 * @tc.author: xushaohua
932 */
933 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync001, TestSize.Level1)
934 {
935 std::vector<std::string> devices;
936 devices.push_back(g_deviceB->GetDeviceId());
937 devices.push_back(g_deviceC->GetDeviceId());
938
939 /**
940 * @tc.steps: step1. deviceA put {k1, v1}
941 */
942 g_kvDelegatePtr->Put(KEY_1, VALUE_1);
943
944 /**
945 * @tc.steps: step2. deviceA call block push sync to deviceB & deviceC.
946 * @tc.expected: step2. Sync return OK, devices status OK, deviceB & deivceC has {k1, v1}.
947 */
948 std::map<std::string, DBStatus> result;
949 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
950 ASSERT_EQ(status, OK);
951 ASSERT_TRUE(result.size() == devices.size());
952 for (const auto &pair : result) {
953 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
954 EXPECT_TRUE(pair.second == OK);
955 }
956 VirtualDataItem item1;
957 EXPECT_EQ(g_deviceB->GetData(KEY_1, item1), OK);
958 EXPECT_EQ(item1.value, VALUE_1);
959 VirtualDataItem item2;
960 EXPECT_EQ(g_deviceC->GetData(KEY_1, item2), OK);
961 EXPECT_EQ(item2.value, VALUE_1);
962 }
963
964 /**
965 * @tc.name: Block Sync 002
966 * @tc.desc: Verify block pull sync function.
967 * @tc.type: FUNC
968 * @tc.require: AR000CKRTD AR000CQE0E
969 * @tc.author: xushaohua
970 */
971 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync002, TestSize.Level1)
972 {
973 std::vector<std::string> devices;
974 devices.push_back(g_deviceB->GetDeviceId());
975 devices.push_back(g_deviceC->GetDeviceId());
976
977 /**
978 * @tc.steps: step1. deviceB put {k1, v1}, deviceC put {k2, v2}
979 */
980 g_deviceB->PutData(KEY_1, VALUE_1, 0, 0);
981 g_deviceC->PutData(KEY_2, VALUE_2, 0, 0);
982
983 /**
984 * @tc.steps: step2. deviceA call block pull and pull sync to deviceB & deviceC.
985 * @tc.expected: step2. Sync return OK, devices status OK, deviceA has {k1, v1}, {k2, v2}
986 */
987 std::map<std::string, DBStatus> result;
988 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
989 ASSERT_EQ(status, OK);
990 ASSERT_TRUE(result.size() == devices.size());
991 for (const auto &pair : result) {
992 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
993 EXPECT_TRUE(pair.second == OK);
994 }
995 Value value3;
996 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == OK);
997 EXPECT_TRUE(value3 == VALUE_1);
998 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == OK);
999 EXPECT_TRUE(value3 == VALUE_2);
1000 }
1001
1002 /**
1003 * @tc.name: Block Sync 003
1004 * @tc.desc: Verify block push and pull sync function.
1005 * @tc.type: FUNC
1006 * @tc.require: AR000CKRTD AR000CQE0E
1007 * @tc.author: xushaohua
1008 */
1009 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync003, TestSize.Level1)
1010 {
1011 std::vector<std::string> devices;
1012 devices.push_back(g_deviceB->GetDeviceId());
1013 devices.push_back(g_deviceC->GetDeviceId());
1014
1015 /**
1016 * @tc.steps: step1. deviceA put {k1, v1}
1017 */
1018 g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1019
1020 /**
1021 * @tc.steps: step2. deviceB put {k1, v1}, deviceB put {k2, v2}
1022 */
1023 g_deviceB->PutData(KEY_2, VALUE_2, 0, 0);
1024 g_deviceC->PutData(KEY_3, VALUE_3, 0, 0);
1025
1026 /**
1027 * @tc.steps: step3. deviceA call block pull and pull sync to deviceB & deviceC.
1028 * @tc.expected: step3. Sync return OK, devices status OK, deviceA has {k1, v1}, {k2, v2} {k3, v3}
1029 * deviceB has {k1, v1}, {k2. v2} , deviceC has {k1, v1}, {k3, v3}
1030 */
1031 std::map<std::string, DBStatus> result;
1032 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, true);
1033 ASSERT_EQ(status, OK);
1034 ASSERT_TRUE(result.size() == devices.size());
1035 for (const auto &pair : result) {
1036 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1037 EXPECT_TRUE(pair.second == OK);
1038 }
1039
1040 VirtualDataItem item1;
1041 g_deviceB->GetData(KEY_1, item1);
1042 EXPECT_TRUE(item1.value == VALUE_1);
1043 g_deviceB->GetData(KEY_2, item1);
1044 EXPECT_TRUE(item1.value == VALUE_2);
1045
1046 VirtualDataItem item2;
1047 g_deviceC->GetData(KEY_1, item2);
1048 EXPECT_TRUE(item2.value == VALUE_1);
1049 g_deviceC->GetData(KEY_3, item2);
1050 EXPECT_TRUE(item2.value == VALUE_3);
1051
1052 Value value3;
1053 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == OK);
1054 EXPECT_TRUE(value3 == VALUE_1);
1055 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == OK);
1056 EXPECT_TRUE(value3 == VALUE_2);
1057 EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_3, value3) == OK);
1058 EXPECT_TRUE(value3 == VALUE_3);
1059 }
1060
1061 /**
1062 * @tc.name: Block Sync 004
1063 * @tc.desc: Verify block sync function invalid args.
1064 * @tc.type: FUNC
1065 * @tc.require: AR000CKRTD AR000CQE0E
1066 * @tc.author: xushaohua
1067 */
1068 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync004, TestSize.Level2)
1069 {
1070 std::vector<std::string> devices;
1071
1072 /**
1073 * @tc.steps: step1. deviceA put {k1, v1}
1074 */
1075 g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1076
1077 /**
1078 * @tc.steps: step2. deviceA call block push sync to deviceB & deviceC.
1079 * @tc.expected: step2. Sync return INVALID_ARGS
1080 */
1081 std::map<std::string, DBStatus> result;
1082 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1083 EXPECT_EQ(status, INVALID_ARGS);
1084
1085 /**
1086 * @tc.steps: step3. deviceB, deviceC offlinem and push deviceA sync to deviceB and deviceC.
1087 * @tc.expected: step3. Sync return OK, but the deviceB and deviceC are TIME_OUT
1088 */
1089 devices.push_back(g_deviceB->GetDeviceId());
1090 devices.push_back(g_deviceC->GetDeviceId());
1091 g_deviceB->Offline();
1092 g_deviceC->Offline();
1093
1094 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1095 EXPECT_EQ(status, OK);
1096 ASSERT_TRUE(result.size() == devices.size());
1097 for (const auto &pair : result) {
1098 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1099 EXPECT_TRUE(pair.second == COMM_FAILURE);
1100 }
1101 }
1102
1103 /**
1104 * @tc.name: Block Sync 005
1105 * @tc.desc: Verify block sync function busy.
1106 * @tc.type: FUNC
1107 * @tc.require: AR000CKRTD AR000CQE0E
1108 * @tc.author: xushaohua
1109 */
1110 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync005, TestSize.Level2)
1111 {
1112 std::vector<std::string> devices;
1113 devices.push_back(g_deviceB->GetDeviceId());
1114 devices.push_back(g_deviceC->GetDeviceId());
1115 /**
1116 * @tc.steps: step1. deviceA put {k1, v1}
1117 */
1118 g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1119
1120 /**
1121 * @tc.steps: step2. New a thread to deviceA call block push sync to deviceB & deviceC,
1122 * but deviceB & C is blocked
1123 * @tc.expected: step2. Sync will be blocked util timeout, and then return OK
1124 */
1125 g_deviceB->Offline();
1126 g_deviceC->Offline();
__anond4119c320302() 1127 thread thread([devices]() {
1128 std::map<std::string, DBStatus> resultInner;
1129 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1130 EXPECT_EQ(status, OK);
1131 });
1132 thread.detach();
1133 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1134 /**
1135 * @tc.steps: step3. sleep 1s and call sync.
1136 * @tc.expected: step3. Sync will return BUSY.
1137 */
1138 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1139 std::map<std::string, DBStatus> result;
1140 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, true);
1141 EXPECT_EQ(status, OK);
1142 }
1143
1144 /**
1145 * @tc.name: SyncQueue001
1146 * @tc.desc: Invalid args check of Pragma GET_QUEUED_SYNC_SIZE SET_QUEUED_SYNC_LIMIT and
1147 * GET_QUEUED_SYNC_LIMIT, expect return INVALID_ARGS.
1148 * @tc.type: FUNC
1149 * @tc.require: AR000D4876
1150 * @tc.author: wangchuanqing
1151 */
1152 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue001, TestSize.Level3)
1153 {
1154 /**
1155 * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE, and set param to be null
1156 * @tc.expected: step1. Expect return INVALID_ARGS.
1157 */
1158 int *param = nullptr;
1159 PragmaData input = static_cast<PragmaData>(param);
1160 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), INVALID_ARGS);
1161
1162 /**
1163 * @tc.steps:step2. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be null
1164 * @tc.expected: step2. Expect return INVALID_ARGS.
1165 */
1166 input = static_cast<PragmaData>(param);
1167 EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1168
1169 /**
1170 * @tc.steps:step3. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT, and set param to be null
1171 * @tc.expected: step3. Expect return INVALID_ARGS.
1172 */
1173 input = static_cast<PragmaData>(param);
1174 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1175
1176 /**
1177 * @tc.steps:step4. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be QUEUED_SYNC_LIMIT_MIN - 1
1178 * @tc.expected: step4. Expect return INVALID_ARGS.
1179 */
1180 int limit = DBConstant::QUEUED_SYNC_LIMIT_MIN - 1;
1181 input = static_cast<PragmaData>(&limit);
1182 EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1183
1184 /**
1185 * @tc.steps:step5. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be QUEUED_SYNC_LIMIT_MAX + 1
1186 * @tc.expected: step5. Expect return INVALID_ARGS.
1187 */
1188 limit = DBConstant::QUEUED_SYNC_LIMIT_MAX + 1;
1189 input = static_cast<PragmaData>(&limit);
1190 EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1191 }
1192
1193 /**
1194 * @tc.name: SyncQueue002
1195 * @tc.desc: Pragma GET_QUEUED_SYNC_LIMIT and SET_QUEUED_SYNC_LIMIT
1196 * @tc.type: FUNC
1197 * @tc.require: AR000D4876
1198 * @tc.author: wangchuanqing
1199 */
1200 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue002, TestSize.Level3)
1201 {
1202 /**
1203 * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT,
1204 * @tc.expected: step1. Expect return OK, limit eq QUEUED_SYNC_LIMIT_DEFAULT.
1205 */
1206 int limit = 0;
1207 PragmaData input = static_cast<PragmaData>(&limit);
1208 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), OK);
1209 EXPECT_EQ(limit, DBConstant::QUEUED_SYNC_LIMIT_DEFAULT);
1210
1211 /**
1212 * @tc.steps:step2. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be 50
1213 * @tc.expected: step2. Expect return OK.
1214 */
1215 limit = 50;
1216 input = static_cast<PragmaData>(&limit);
1217 EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), OK);
1218
1219 /**
1220 * @tc.steps:step3. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT,
1221 * @tc.expected: step3. Expect return OK, limit eq 50
1222 */
1223 limit = 0;
1224 input = static_cast<PragmaData>(&limit);
1225 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), OK);
1226 EXPECT_EQ(limit, 50);
1227 }
1228
1229 /**
1230 * @tc.name: SyncQueue003
1231 * @tc.desc: sync queue test
1232 * @tc.type: FUNC
1233 * @tc.require: AR000D4876
1234 * @tc.author: wangchuanqing
1235 */
1236 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue003, TestSize.Level3)
1237 {
1238 DBStatus status = OK;
1239 std::vector<std::string> devices;
1240 devices.push_back(g_deviceB->GetDeviceId());
1241 devices.push_back(g_deviceC->GetDeviceId());
1242
1243 /**
1244 * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1245 * @tc.expected: step1. Expect return OK, size eq 0.
1246 */
1247 int size;
1248 PragmaData input = static_cast<PragmaData>(&size);
1249 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1250 EXPECT_EQ(size, 0);
1251
1252 /**
1253 * @tc.steps:step2. deviceA put {k1, v1}
1254 */
1255 status = g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1256 ASSERT_TRUE(status == OK);
1257
1258 /**
1259 * @tc.steps:step3. deviceA sync SYNC_MODE_PUSH_ONLY
1260 */
1261 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1262 ASSERT_TRUE(status == OK);
1263
1264 /**
1265 * @tc.steps:step4. deviceA put {k2, v2}
1266 */
1267 status = g_kvDelegatePtr->Put(KEY_2, VALUE_2);
1268 ASSERT_TRUE(status == OK);
1269
1270 /**
1271 * @tc.steps:step5. deviceA sync SYNC_MODE_PUSH_ONLY
1272 */
1273 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1274 ASSERT_TRUE(status == OK);
1275
1276 /**
1277 * @tc.steps:step6. deviceB put {k3, v3}
1278 */
1279 g_deviceB->PutData(KEY_3, VALUE_3, 0, 0);
1280
1281 /**
1282 * @tc.steps:step7. deviceA put {k4, v4}
1283 */
1284 status = g_kvDelegatePtr->Put(KEY_4, VALUE_4);
1285 ASSERT_TRUE(status == OK);
1286
1287 /**
1288 * @tc.steps:step8. deviceA sync SYNC_MODE_PUSH_PULL
1289 */
1290 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_PULL, nullptr, false);
1291 ASSERT_TRUE(status == OK);
1292
1293 /**
1294 * @tc.steps:step9. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1295 * @tc.expected: step1. Expect return OK, 0 <= size <= 4
1296 */
1297 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1298 ASSERT_TRUE((size >= 0) && (size <= 4));
1299
1300 /**
1301 * @tc.steps:step10. deviceB put {k5, v5}
1302 */
1303 g_deviceB->PutData(KEY_5, VALUE_5, 0, 0);
1304
1305 /**
1306 * @tc.steps:step11. deviceA call sync and wait
1307 * @tc.expected: step11. sync should return OK.
1308 */
1309 std::map<std::string, DBStatus> result;
1310 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
1311 ASSERT_TRUE(status == OK);
1312
1313 /**
1314 * @tc.expected: step11. onComplete should be called, DeviceA,B,C have {k1,v1}~ {KEY_5,VALUE_5}
1315 */
1316 ASSERT_TRUE(result.size() == devices.size());
1317 for (const auto &pair : result) {
1318 EXPECT_TRUE(pair.second == OK);
1319 }
1320 VirtualDataItem item;
1321 g_deviceB->GetData(KEY_1, item);
1322 EXPECT_TRUE(item.value == VALUE_1);
1323 g_deviceB->GetData(KEY_2, item);
1324 EXPECT_TRUE(item.value == VALUE_2);
1325 g_deviceB->GetData(KEY_3, item);
1326 EXPECT_TRUE(item.value == VALUE_3);
1327 g_deviceB->GetData(KEY_4, item);
1328 EXPECT_TRUE(item.value == VALUE_4);
1329 g_deviceB->GetData(KEY_5, item);
1330 EXPECT_TRUE(item.value == VALUE_5);
1331 Value value;
1332 EXPECT_EQ(g_kvDelegatePtr->Get(KEY_3, value), OK);
1333 EXPECT_EQ(VALUE_3, value);
1334 EXPECT_EQ(g_kvDelegatePtr->Get(KEY_5, value), OK);
1335 EXPECT_EQ(VALUE_5, value);
1336 }
1337
1338 /**
1339 * @tc.name: SyncQueue004
1340 * @tc.desc: sync queue full test
1341 * @tc.type: FUNC
1342 * @tc.require: AR000D4876
1343 * @tc.author: wangchuanqing
1344 */
1345 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue004, TestSize.Level3)
1346 {
1347 DBStatus status = OK;
1348 std::vector<std::string> devices;
1349 devices.push_back(g_deviceB->GetDeviceId());
1350 devices.push_back(g_deviceC->GetDeviceId());
1351
1352 /**
1353 * @tc.steps:step1. deviceB C block
1354 */
1355 g_communicatorAggregator->SetBlockValue(true);
1356
1357 /**
1358 * @tc.steps:step2. deviceA put {k1, v1}
1359 */
1360 status = g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1361 ASSERT_TRUE(status == OK);
1362
1363 /**
1364 * @tc.steps:step3. deviceA sync QUEUED_SYNC_LIMIT_DEFAULT times
1365 * @tc.expected: step3. Expect return OK
1366 */
1367 for (int i = 0; i < DBConstant::QUEUED_SYNC_LIMIT_DEFAULT; i++) {
1368 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1369 ASSERT_TRUE(status == OK);
1370 }
1371
1372 /**
1373 * @tc.steps:step4. deviceA sync
1374 * @tc.expected: step4. Expect return BUSY
1375 */
1376 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1377 ASSERT_TRUE(status == BUSY);
1378 g_communicatorAggregator->SetBlockValue(false);
1379 }
1380
1381 /**
1382 * @tc.name: SyncQueue005
1383 * @tc.desc: block sync queue test
1384 * @tc.type: FUNC
1385 * @tc.require: AR000D4876
1386 * @tc.author: wangchuanqing
1387 */
1388 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue005, TestSize.Level3)
1389 {
1390 std::vector<std::string> devices;
1391 devices.push_back(g_deviceB->GetDeviceId());
1392 devices.push_back(g_deviceC->GetDeviceId());
1393 /**
1394 * @tc.steps:step1. New a thread to deviceA call block push sync to deviceB & deviceC,
1395 * but deviceB & C is offline
1396 * @tc.expected: step1. Sync will be blocked util timeout, and then return OK
1397 */
1398 g_deviceB->Offline();
1399 g_deviceC->Offline();
1400 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1401
1402 /**
1403 * @tc.steps:step2. deviceA put {k1, v1}
1404 */
1405 g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1406
1407 std::mutex lockMutex;
1408 std::condition_variable conditionVar;
1409
__anond4119c320402() 1410 std::thread threadFirst([devices]() {
1411 std::map<std::string, DBStatus> resultInner;
1412 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1413 EXPECT_EQ(status, OK);
1414 });
1415 threadFirst.detach();
1416 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1417 /**
1418 * @tc.steps:step3. New a thread to deviceA call block push sync to deviceB & deviceC,
1419 * but deviceB & C is offline
1420 * @tc.expected: step2. Sync will be blocked util timeout, and then return OK
1421 */
__anond4119c320502() 1422 std::thread threadSecond([devices, &lockMutex, &conditionVar]() {
1423 std::map<std::string, DBStatus> resultInner;
1424 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1425 EXPECT_EQ(status, OK);
1426 std::unique_lock<mutex> lockInner(lockMutex);
1427 conditionVar.notify_one();
1428 });
1429 threadSecond.detach();
1430
1431 /**
1432 * @tc.steps:step4. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1433 * @tc.expected: step1. Expect return OK, size eq 0.
1434 */
1435 int size;
1436 PragmaData input = static_cast<PragmaData>(&size);
1437 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1438 EXPECT_EQ(size, 0);
1439
1440 /**
1441 * @tc.steps:step5. wait exit
1442 */
1443 std::unique_lock<mutex> lock(lockMutex);
1444 auto now = std::chrono::system_clock::now();
1445 conditionVar.wait_until(lock, now + 2 * INT8_MAX * 1000ms);
1446 }
1447
1448 /**
1449 * @tc.name: CalculateSyncData001
1450 * @tc.desc: Test sync data whose device never synced before
1451 * @tc.type: FUNC
1452 * @tc.require: AR000HI2JS
1453 * @tc.author: zhuwentao
1454 */
1455 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData001, TestSize.Level3)
1456 {
1457 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1458 size_t dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
1459 uint32_t serialHeadLen = 8u;
1460 EXPECT_EQ(static_cast<uint32_t>(dataSize), 0u + serialHeadLen);
1461 uint32_t keySize = 256u;
1462 uint32_t valuesize = 1024u;
1463 uint32_t itemCount = 10u;
1464 CalculateDataTest(itemCount, keySize, valuesize);
1465 }
1466
1467 /**
1468 * @tc.name: CalculateSyncData002
1469 * @tc.desc: Test sync data whose device synced before, but sync data is less than 1M
1470 * @tc.type: FUNC
1471 * @tc.require: AR000HI2JS
1472 * @tc.author: zhuwentao
1473 */
1474 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData002, TestSize.Level3)
1475 {
1476 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1477 Key key1 = {'1'};
1478 Value value1 = {'1'};
1479 EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
1480
1481 std::vector<std::string> devices;
1482 devices.push_back(g_deviceB->GetDeviceId());
1483 std::map<std::string, DBStatus> result;
1484 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1485 ASSERT_TRUE(status == OK);
1486 ASSERT_TRUE(result.size() == devices.size());
1487 for (const auto &pair : result) {
1488 EXPECT_TRUE(pair.second == OK);
1489 }
1490
1491 uint32_t keySize = 256u;
1492 uint32_t valuesize = 512u;
1493 uint32_t itemCount = 20u;
1494 CalculateDataTest(itemCount, keySize, valuesize);
1495 }
1496
1497 /**
1498 * @tc.name: CalculateSyncData003
1499 * @tc.desc: Test sync data whose device synced before, but sync data is larger than 1M
1500 * @tc.type: FUNC
1501 * @tc.require: AR000HI2JS
1502 * @tc.author: zhuwentao
1503 */
1504 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData003, TestSize.Level3)
1505 {
1506 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1507 Key key1 = {'1'};
1508 Value value1 = {'1'};
1509 EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
1510
1511 std::vector<std::string> devices;
1512 devices.push_back(g_deviceB->GetDeviceId());
1513 std::map<std::string, DBStatus> result;
1514 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1515 ASSERT_TRUE(status == OK);
1516 ASSERT_TRUE(result.size() == devices.size());
1517 for (const auto &pair : result) {
1518 EXPECT_TRUE(pair.second == OK);
1519 }
1520 uint32_t keySize = 256u;
1521 uint32_t valuesize = 1024u;
1522 uint32_t itemCount = 2048u;
1523 CalculateDataTest(itemCount, keySize, valuesize);
1524 }
1525
1526 /**
1527 * @tc.name: CalculateSyncData004
1528 * @tc.desc: Test invalid device when call GetSyncDataSize interface
1529 * @tc.type: FUNC
1530 * @tc.require: AR000HI2JS
1531 * @tc.author: zhuwentao
1532 */
1533 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData004, TestSize.Level3)
1534 {
1535 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1536 std::string device;
1537 EXPECT_EQ(g_kvDelegatePtr->GetSyncDataSize(device), 0u);
1538 }
1539
1540 /**
1541 * @tc.name: CalculateSyncData005
1542 * @tc.desc: Test CalculateSyncData and rekey Concurrently
1543 * @tc.type: FUNC
1544 * @tc.require: AR000HI2JS
1545 * @tc.author: zhuwentao
1546 */
1547 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData005, TestSize.Level3)
1548 {
1549 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1550 size_t dataSize = 0;
1551 Key key1 = {'1'};
1552 Value value1 = {'1'};
1553 EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
__anond4119c320602() 1554 std::thread thread1([]() {
1555 std::this_thread::sleep_for(std::chrono::milliseconds(1));
1556 CipherPassword passwd; // random password
1557 vector<uint8_t> passwdBuffer(10, 45); // 10 and 45 as random password.
1558 passwd.SetValue(passwdBuffer.data(), passwdBuffer.size());
1559 g_kvDelegatePtr->Rekey(passwd);
1560 });
__anond4119c320702() 1561 std::thread thread2([&dataSize, &key1, &value1]() {
1562 dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
1563 if (dataSize > 0) {
1564 uint32_t expectedDataSize = (key1.size() + value1.size());
1565 uint32_t externalSize = 70u;
1566 uint32_t serialHeadLen = 8u;
1567 ASSERT_GE(static_cast<uint32_t>(dataSize), expectedDataSize);
1568 ASSERT_LE(static_cast<uint32_t>(dataSize), serialHeadLen + expectedDataSize + externalSize);
1569 }
1570 });
1571 thread1.join();
1572 thread2.join();
1573 }