• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <condition_variable>
17 #include <gtest/gtest.h>
18 #include <thread>
19 
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "platform_specific.h"
27 #include "single_ver_data_sync.h"
28 #include "single_ver_kv_sync_task_context.h"
29 
30 using namespace testing::ext;
31 using namespace DistributedDB;
32 using namespace DistributedDBUnitTest;
33 using namespace std;
34 
35 namespace {
36     string g_testDir;
37     const string STORE_ID = "kv_stroe_sync_test";
38     const int64_t TIME_OFFSET = 5000000;
39     const int WAIT_TIME = 1000;
40     const std::string DEVICE_A = "real_device";
41     const std::string DEVICE_B = "deviceB";
42     const std::string DEVICE_C = "deviceC";
43 
44     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
45     KvStoreConfig g_config;
46     DistributedDBToolsUnitTest g_tool;
47     DBStatus g_kvDelegateStatus = INVALID_ARGS;
48     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
49     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
50     KvVirtualDevice *g_deviceB = nullptr;
51     KvVirtualDevice *g_deviceC = nullptr;
52 
53     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
54     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
55         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
56 
CalculateDataTest(uint32_t itemCount,uint32_t keySize,uint32_t valueSize)57     void CalculateDataTest(uint32_t itemCount, uint32_t keySize, uint32_t valueSize)
58     {
59         for (uint32_t i = 0; i < itemCount; i++) {
60             std::vector<uint8_t> prefixKey = {'a', 'b', 'c'};
61             Key key = DistributedDBToolsUnitTest::GetRandPrefixKey(prefixKey, keySize);
62             Value value;
63             DistributedDBToolsUnitTest::GetRandomKeyValue(value, valueSize);
64             EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
65         }
66         size_t dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
67         uint32_t expectedDataSize = (valueSize + keySize);
68         uint32_t externalSize = 70u;
69         uint32_t serialHeadLen = 8u;
70         LOGI("expectedDataSize=%u, v=%u", expectedDataSize, externalSize);
71         uint32_t maxDataSize = 1024u * 1024u;
72         if (itemCount * expectedDataSize >= maxDataSize) {
73             EXPECT_EQ(static_cast<uint32_t>(dataSize), maxDataSize);
74             return;
75         }
76         ASSERT_GE(static_cast<uint32_t>(dataSize), itemCount * expectedDataSize);
77         ASSERT_LE(static_cast<uint32_t>(dataSize), serialHeadLen + itemCount * (expectedDataSize + externalSize));
78     }
79 }
80 
81 class DistributedDBSingleVerP2PSimpleSyncTest : public testing::Test {
82 public:
83     static void SetUpTestCase(void);
84     static void TearDownTestCase(void);
85     void SetUp();
86     void TearDown();
87 };
88 
SetUpTestCase(void)89 void DistributedDBSingleVerP2PSimpleSyncTest::SetUpTestCase(void)
90 {
91     /**
92      * @tc.setup: Init datadir and Virtual Communicator.
93      */
94     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
95     g_config.dataDir = g_testDir;
96     g_mgr.SetKvStoreConfig(g_config);
97 
98     string dir = g_testDir + "/single_ver";
99     DIR* dirTmp = opendir(dir.c_str());
100     if (dirTmp == nullptr) {
101         OS::MakeDBDirectory(dir);
102     } else {
103         closedir(dirTmp);
104     }
105 
106     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
107     ASSERT_TRUE(g_communicatorAggregator != nullptr);
108     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
109 }
110 
TearDownTestCase(void)111 void DistributedDBSingleVerP2PSimpleSyncTest::TearDownTestCase(void)
112 {
113     /**
114      * @tc.teardown: Release virtual Communicator and clear data dir.
115      */
116     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
117         LOGE("rm test db files error!");
118     }
119     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
120 }
121 
SetUp(void)122 void DistributedDBSingleVerP2PSimpleSyncTest::SetUp(void)
123 {
124     DistributedDBToolsUnitTest::PrintTestCaseInfo();
125     /**
126      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
127      */
128     KvStoreNbDelegate::Option option;
129     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
130     ASSERT_TRUE(g_kvDelegateStatus == OK);
131     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
132     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
133     ASSERT_TRUE(g_deviceB != nullptr);
134     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
135     ASSERT_TRUE(syncInterfaceB != nullptr);
136     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
137 
138     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
139     ASSERT_TRUE(g_deviceC != nullptr);
140     VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
141     ASSERT_TRUE(syncInterfaceC != nullptr);
142     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
143 
144     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
145         const std::string &deviceId, uint8_t flag) -> bool {
146             return true;
147         };
148     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
149 }
150 
TearDown(void)151 void DistributedDBSingleVerP2PSimpleSyncTest::TearDown(void)
152 {
153     /**
154      * @tc.teardown: Release device A, B, C
155      */
156     if (g_kvDelegatePtr != nullptr) {
157         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
158         g_kvDelegatePtr = nullptr;
159         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
160         LOGD("delete kv store status %d", status);
161         ASSERT_TRUE(status == OK);
162     }
163     if (g_deviceB != nullptr) {
164         delete g_deviceB;
165         g_deviceB = nullptr;
166     }
167     if (g_deviceC != nullptr) {
168         delete g_deviceC;
169         g_deviceC = nullptr;
170     }
171     PermissionCheckCallbackV2 nullCallback;
172     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
173 }
174 
175 /**
176  * @tc.name: Normal Sync 001
177  * @tc.desc: Test normal push sync for add data.
178  * @tc.type: FUNC
179  * @tc.require: AR000CQS3S SR000CQE0B
180  * @tc.author: xushaohua
181  */
182 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync001, TestSize.Level1)
183 {
184     DBStatus status = OK;
185     std::vector<std::string> devices;
186     devices.push_back(g_deviceB->GetDeviceId());
187     devices.push_back(g_deviceC->GetDeviceId());
188 
189     /**
190      * @tc.steps: step1. deviceA put {k1, v1}
191      */
192     Key key = {'1'};
193     Value value = {'1'};
194     status = g_kvDelegatePtr->Put(key, value);
195     ASSERT_TRUE(status == OK);
196 
197     /**
198      * @tc.steps: step2. deviceA call sync and wait
199      * @tc.expected: step2. sync should return OK.
200      */
201     std::map<std::string, DBStatus> result;
202     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
203     ASSERT_TRUE(status == OK);
204 
205     /**
206      * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
207      */
208     ASSERT_TRUE(result.size() == devices.size());
209     for (const auto &pair : result) {
210         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
211         EXPECT_TRUE(pair.second == OK);
212     }
213     VirtualDataItem item;
214     g_deviceB->GetData(key, item);
215     EXPECT_TRUE(item.value == value);
216     g_deviceC->GetData(key, item);
217     EXPECT_TRUE(item.value == value);
218 }
219 
220 /**
221  * @tc.name: Normal Sync 002
222  * @tc.desc: Test normal push sync for update data.
223  * @tc.type: FUNC
224  * @tc.require: AR000CCPOM
225  * @tc.author: xushaohua
226  */
227 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync002, TestSize.Level1)
228 {
229     DBStatus status = OK;
230     std::vector<std::string> devices;
231     devices.push_back(g_deviceB->GetDeviceId());
232     devices.push_back(g_deviceC->GetDeviceId());
233 
234     /**
235      * @tc.steps: step1. deviceA put {k1, v1}
236      */
237     Key key = {'1'};
238     Value value = {'1'};
239     status = g_kvDelegatePtr->Put(key, value);
240     ASSERT_TRUE(status == OK);
241 
242     /**
243      * @tc.steps: step2. deviceA put {k1, v2}
244      */
245     Value value2;
246     value2.push_back('2');
247     status = g_kvDelegatePtr->Put(key, value2);
248     ASSERT_TRUE(status == OK);
249 
250     /**
251      * @tc.steps: step3. deviceA call sync and wait
252      * @tc.expected: step3. sync should return OK.
253      */
254     std::map<std::string, DBStatus> result;
255     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
256     ASSERT_TRUE(status == OK);
257 
258     /**
259      * @tc.expected: step3. onComplete should be called, DeviceB,C have {k1,v2}
260      */
261     ASSERT_TRUE(result.size() == devices.size());
262     for (const auto &pair : result) {
263         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
264         EXPECT_TRUE(pair.second == OK);
265     }
266     VirtualDataItem item;
267     g_deviceC->GetData(key, item);
268     EXPECT_TRUE(item.value == value2);
269     g_deviceB->GetData(key, item);
270     EXPECT_TRUE(item.value == value2);
271 }
272 
273 /**
274  * @tc.name: Normal Sync 003
275  * @tc.desc: Test normal push sync for delete data.
276  * @tc.type: FUNC
277  * @tc.require: AR000CQS3S
278  * @tc.author: xushaohua
279  */
280 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync003, TestSize.Level1)
281 {
282     DBStatus status = OK;
283     std::vector<std::string> devices;
284     devices.push_back(g_deviceB->GetDeviceId());
285     devices.push_back(g_deviceC->GetDeviceId());
286 
287     /**
288      * @tc.steps: step1. deviceA put {k1, v1}
289      */
290     Key key = {'1'};
291     Value value = {'1'};
292     status = g_kvDelegatePtr->Put(key, value);
293     ASSERT_TRUE(status == OK);
294 
295     /**
296      * @tc.steps: step2. deviceA delete k1
297      */
298     status = g_kvDelegatePtr->Delete(key);
299     ASSERT_TRUE(status == OK);
300     std::map<std::string, DBStatus> result;
301     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
302     ASSERT_TRUE(status == OK);
303 
304     /**
305      * @tc.steps: step3. deviceA call sync and wait
306      * @tc.expected: step3. sync should return OK.
307      */
308     ASSERT_TRUE(result.size() == devices.size());
309     for (const auto &pair : result) {
310         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
311         EXPECT_TRUE(pair.second == OK);
312     }
313 
314     /**
315      * @tc.expected: step3. onComplete should be called, DeviceB,C have {k1, delete}
316      */
317     VirtualDataItem item;
318     Key hashKey;
319     DistributedDBToolsUnitTest::CalcHash(key, hashKey);
320     EXPECT_EQ(g_deviceB->GetData(hashKey, item), -E_NOT_FOUND);
321     EXPECT_EQ(g_deviceC->GetData(hashKey, item), -E_NOT_FOUND);
322 }
323 
324 /**
325  * @tc.name: Normal Sync 004
326  * @tc.desc: Test normal pull sync for add data.
327  * @tc.type: FUNC
328  * @tc.require: AR000CCPOM
329  * @tc.author: xushaohua
330  */
331 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync004, TestSize.Level1)
332 {
333     DBStatus status = OK;
334     std::vector<std::string> devices;
335     devices.push_back(g_deviceB->GetDeviceId());
336     devices.push_back(g_deviceC->GetDeviceId());
337 
338     /**
339      * @tc.steps: step1. deviceB put {k1, v1}
340      */
341     Key key = {'1'};
342     Value value = {'1'};
343     g_deviceB->PutData(key, value, 0, 0);
344 
345     /**
346      * @tc.steps: step2. deviceB put {k2, v2}
347      */
348     Key key2 = {'2'};
349     Value value2 = {'2'};
350     g_deviceC->PutData(key2, value2, 0, 0);
351     ASSERT_TRUE(status == OK);
352 
353     /**
354      * @tc.steps: step3. deviceA call pull sync
355      * @tc.expected: step3. sync should return OK.
356      */
357     std::map<std::string, DBStatus> result;
358     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
359     ASSERT_TRUE(status == OK);
360 
361     /**
362      * @tc.expected: step3. onComplete should be called, DeviceA have {k1, VALUE_1}, {K2. VALUE_2}
363      */
364     ASSERT_TRUE(result.size() == devices.size());
365     for (const auto &pair : result) {
366         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
367         EXPECT_TRUE(pair.second == OK);
368     }
369     Value value3;
370     EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
371     EXPECT_EQ(value3, value);
372     EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
373     EXPECT_EQ(value3, value2);
374 }
375 
376 /**
377  * @tc.name: Normal Sync 005
378  * @tc.desc: Test normal pull sync for update data.
379  * @tc.type: FUNC
380  * @tc.require: AR000CCPOM SR000CQE10
381  * @tc.author: xushaohua
382  */
383 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync005, TestSize.Level2)
384 {
385     DBStatus status = OK;
386     std::vector<std::string> devices;
387     devices.push_back(g_deviceB->GetDeviceId());
388     devices.push_back(g_deviceC->GetDeviceId());
389 
390     /**
391      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
392      */
393     Key key1 = {'1'};
394     Value value1 = {'1'};
395     status = g_kvDelegatePtr->Put(key1, value1);
396     ASSERT_TRUE(status == OK);
397     Key key2 = {'2'};
398     Value value2 = {'2'};
399     status = g_kvDelegatePtr->Put(key2, value2);
400     ASSERT_TRUE(status == OK);
401 
402     /**
403      * @tc.steps: step2. deviceB put {k1, v3} t2, t2 > t1
404      */
405     Value value3;
406     value3.push_back('3');
407     g_deviceB->PutData(key1, value3,
408         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 0);
409 
410     /**
411      * @tc.steps: step3. deviceC put {k2, v4} t2, t4 < t1
412      */
413     Value value4;
414     value4.push_back('4');
415     g_deviceC->PutData(key2, value4,
416         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
417 
418     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
419     /**
420      * @tc.steps: step4. deviceA call pull sync
421      * @tc.expected: step4. sync should return OK.
422      */
423     std::map<std::string, DBStatus> result;
424     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
425     ASSERT_TRUE(status == OK);
426 
427     /**
428      * @tc.expected: step4. onComplete should be called, DeviceA have {k1, v3}, {k2. v2}
429      */
430     ASSERT_TRUE(result.size() == devices.size());
431     for (const auto &pair : result) {
432         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
433         EXPECT_TRUE(pair.second == OK);
434     }
435 
436     Value value5;
437     g_kvDelegatePtr->Get(key1, value5);
438     EXPECT_TRUE(value5 == value3);
439     g_kvDelegatePtr->Get(key2, value5);
440     EXPECT_TRUE(value5 == value2);
441 }
442 
443 /**
444  * @tc.name: Normal Sync 006
445  * @tc.desc: Test normal pull sync for delete data.
446  * @tc.type: FUNC
447  * @tc.require: AR000CQS3S
448  * @tc.author: xushaohua
449  */
450 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync006, TestSize.Level2)
451 {
452     /**
453      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
454      */
455     Key key1 = {'1'};
456     Value value1 = {'1'};
457     DBStatus status = g_kvDelegatePtr->Put(key1, value1);
458     ASSERT_TRUE(status == OK);
459     Key key2 = {'2'};
460     Value value2 = {'2'};
461     status = g_kvDelegatePtr->Put(key2, value2);
462     ASSERT_TRUE(status == OK);
463 
464     /**
465      * @tc.steps: step2. deviceA put {k1, delete} t2, t2 <t1
466      */
467     Key hashKey1;
468     DistributedDBToolsUnitTest::CalcHash(key1, hashKey1);
469     g_deviceB->PutData(hashKey1, value1,
470         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 1);
471 
472     /**
473      * @tc.steps: step3. deviceA put {k1, delete} t3, t3 < t1
474      */
475     Key hashKey2;
476     DistributedDBToolsUnitTest::CalcHash(key2, hashKey2);
477     g_deviceC->PutData(hashKey2, value1,
478         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
479 
480     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
481     /**
482      * @tc.steps: step4. deviceA call pull sync
483      * @tc.expected: step4. sync should return OK.
484      */
485     std::map<std::string, DBStatus> result;
486     std::vector<std::string> devices;
487     devices.push_back(g_deviceB->GetDeviceId());
488     devices.push_back(g_deviceC->GetDeviceId());
489     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
490     ASSERT_TRUE(status == OK);
491 
492     /**
493      * @tc.expected: step4. onComplete should be called, DeviceA have {k2. v2} don't have k1
494      */
495     ASSERT_TRUE(result.size() == devices.size());
496     for (const auto &pair : result) {
497         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
498         EXPECT_TRUE(pair.second == OK);
499     }
500     Value value5;
501     g_kvDelegatePtr->Get(key1, value5);
502     EXPECT_TRUE(value5.empty());
503     g_kvDelegatePtr->Get(key2, value5);
504     EXPECT_TRUE(value5 == value2);
505 }
506 
507 /**
508  * @tc.name: Normal Sync 007
509  * @tc.desc: Test normal push_pull sync for add data.
510  * @tc.type: FUNC
511  * @tc.require: AR000CCPOM
512  * @tc.author: xushaohua
513  */
514 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync007, TestSize.Level1)
515 {
516     DBStatus status = OK;
517     std::vector<std::string> devices;
518     devices.push_back(g_deviceB->GetDeviceId());
519     devices.push_back(g_deviceC->GetDeviceId());
520 
521     /**
522      * @tc.steps: step1. deviceA put {k1, v1}
523      */
524     Key key1 = {'1'};
525     Value value1 = {'1'};
526     status = g_kvDelegatePtr->Put(key1, value1);
527     EXPECT_TRUE(status == OK);
528 
529     /**
530      * @tc.steps: step1. deviceB put {k2, v2}
531      */
532     Key key2 = {'2'};
533     Value value2 = {'2'};
534     g_deviceB->PutData(key2, value2, 0, 0);
535 
536     /**
537      * @tc.steps: step1. deviceB put {k3, v3}
538      */
539     Key key3 = {'3'};
540     Value value3 = {'3'};
541     g_deviceC->PutData(key3, value3, 0, 0);
542 
543     /**
544      * @tc.steps: step4. deviceA call push_pull sync
545      * @tc.expected: step4. sync should return OK.
546      */
547     std::map<std::string, DBStatus> result;
548     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
549     ASSERT_TRUE(status == OK);
550 
551     ASSERT_TRUE(result.size() == devices.size());
552     for (const auto &pair : result) {
553         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
554         EXPECT_TRUE(pair.second == OK);
555     }
556 
557     /**
558      * @tc.expected: step4. onComplete should be called, DeviceA have {k1. v1}, {k2, v2}, {k3, v3}
559      *     deviceB received {k1. v1}, don't received k3, deviceC received {k1. v1}, don't received k2
560      */
561     Value value4;
562     g_kvDelegatePtr->Get(key2, value4);
563     EXPECT_TRUE(value4 == value2);
564     g_kvDelegatePtr->Get(key3, value4);
565     EXPECT_TRUE(value4 == value3);
566 
567     VirtualDataItem item1;
568     g_deviceB->GetData(key1, item1);
569     EXPECT_TRUE(item1.value == value1);
570     item1.value.clear();
571     g_deviceB->GetData(key3, item1);
572     EXPECT_TRUE(item1.value.empty());
573 
574     VirtualDataItem item2;
575     g_deviceC->GetData(key1, item2);
576     EXPECT_TRUE(item2.value == value1);
577     item2.value.clear();
578     g_deviceC->GetData(key2, item2);
579     EXPECT_TRUE(item2.value.empty());
580 }
581 
582 /**
583  * @tc.name: Normal Sync 008
584  * @tc.desc: Test normal push_pull sync for update data.
585  * @tc.type: FUNC
586  * @tc.require: AR000CCPOM
587  * @tc.author: xushaohua
588  */
589 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync008, TestSize.Level2)
590 {
591     DBStatus status = OK;
592     std::vector<std::string> devices;
593     devices.push_back(g_deviceB->GetDeviceId());
594     devices.push_back(g_deviceC->GetDeviceId());
595 
596     /**
597      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
598      */
599     Key key1 = {'1'};
600     Value value1 = {'1'};
601     status = g_kvDelegatePtr->Put(key1, value1);
602     ASSERT_TRUE(status == OK);
603 
604     Key key2 = {'2'};
605     Value value2 = {'2'};
606     status = g_kvDelegatePtr->Put(key2, value2);
607     ASSERT_TRUE(status == OK);
608 
609     /**
610      * @tc.steps: step2. deviceB put {k1, v3} t2, t2 > t1
611      */
612     Value value3 = {'3'};
613     g_deviceB->PutData(key1, value3,
614         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 0);
615 
616     /**
617      * @tc.steps: step3. deviceB put {k1, v4} t3, t4 <t1
618      */
619     Value value4 = {'4'};
620     g_deviceC->PutData(key2, value4,
621         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
622     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
623 
624     /**
625      * @tc.steps: step4. deviceA call push_pull sync
626      * @tc.expected: step4. sync should return OK.
627      */
628     std::map<std::string, DBStatus> result;
629     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
630     ASSERT_TRUE(status == OK);
631     ASSERT_TRUE(result.size() == devices.size());
632     for (const auto &pair : result) {
633         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
634         EXPECT_TRUE(pair.second == OK);
635     }
636 
637     /**
638      * @tc.expected: step4. onComplete should be called, DeviceA have {k1. v3}, {k2, v2}
639      *     deviceB have {k1. v3}, deviceC have {k2. v2}
640      */
641     Value value5;
642     g_kvDelegatePtr->Get(key1, value5);
643     EXPECT_EQ(value5, value3);
644     g_kvDelegatePtr->Get(key2, value5);
645     EXPECT_EQ(value5, value2);
646 
647     VirtualDataItem item1;
648     g_deviceB->GetData(key1, item1);
649     EXPECT_TRUE(item1.value == value3);
650     item1.value.clear();
651     g_deviceB->GetData(key2, item1);
652     EXPECT_TRUE(item1.value == value2);
653 
654     VirtualDataItem item2;
655     g_deviceC->GetData(key2, item2);
656     EXPECT_TRUE(item2.value == value2);
657 }
658 
659 /**
660  * @tc.name: Normal Sync 009
661  * @tc.desc: Test normal push_pull sync for delete data.
662  * @tc.type: FUNC
663  * @tc.require: AR000CCPOM
664  * @tc.author: xushaohua
665  */
666 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync009, TestSize.Level2)
667 {
668     DBStatus status = OK;
669     std::vector<std::string> devices;
670     devices.push_back(g_deviceB->GetDeviceId());
671     devices.push_back(g_deviceC->GetDeviceId());
672 
673     /**
674      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
675      */
676     Key key1 = {'1'};
677     Value value1 = {'1'};
678     status = g_kvDelegatePtr->Put(key1, value1);
679     ASSERT_TRUE(status == OK);
680 
681     Key key2 = {'2'};
682     Value value2 = {'2'};
683     status = g_kvDelegatePtr->Put(key2, value2);
684     ASSERT_TRUE(status == OK);
685 
686     /**
687      * @tc.steps: step2. deviceB put {k1, delete} t2, t2 > t1
688      */
689     Key hashKey1;
690     DistributedDBToolsUnitTest::CalcHash(key1, hashKey1);
691     g_deviceB->PutData(hashKey1, value1,
692         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 1);
693 
694     /**
695      * @tc.steps: step3. deviceB put {k1, delete} t3, t2 < t1
696      */
697     Key hashKey2;
698     DistributedDBToolsUnitTest::CalcHash(key2, hashKey2);
699     g_deviceC->PutData(hashKey2, value2,
700         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 1);
701 
702     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
703     /**
704      * @tc.steps: step4. deviceA call push_pull sync
705      * @tc.expected: step4. sync should return OK.
706      */
707     std::map<std::string, DBStatus> result;
708     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
709     ASSERT_TRUE(status == OK);
710 
711     /**
712      * @tc.expected: step4. onComplete should be called, DeviceA have {k1. delete}, {k2, v2}
713      *     deviceB have {k2. v2}, deviceC have {k2. v2}
714      */
715     ASSERT_TRUE(result.size() == devices.size());
716     for (const auto &pair : result) {
717         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
718         EXPECT_TRUE(pair.second == OK);
719     }
720 
721     Value value3;
722     g_kvDelegatePtr->Get(key1, value3);
723     EXPECT_TRUE(value3.empty());
724     value3.clear();
725     g_kvDelegatePtr->Get(key2, value3);
726     EXPECT_TRUE(value3 == value2);
727 
728     VirtualDataItem item1;
729     g_deviceB->GetData(key2, item1);
730     EXPECT_TRUE(item1.value == value2);
731 
732     VirtualDataItem item2;
733     g_deviceC->GetData(key2, item2);
734     EXPECT_TRUE(item2.value == value2);
735 }
736 
737 /**
738  * @tc.name: Normal Sync 010
739  * @tc.desc: Test sync failed by invalid devices.
740  * @tc.type: FUNC
741  * @tc.require: AR000CCPOM
742  * @tc.author: zhangqiquan
743  */
744 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync010, TestSize.Level1)
745 {
746     DBStatus status = OK;
747     std::vector<std::string> devices;
748     std::string invalidDev = std::string(DBConstant::MAX_DEV_LENGTH + 1, '0');
749     devices.push_back(DEVICE_A);
750     devices.push_back(g_deviceB->GetDeviceId());
751     devices.push_back(invalidDev);
752 
753     std::map<std::string, DBStatus> result;
754     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
755     ASSERT_TRUE(status == OK);
756 
757     ASSERT_EQ(result.size(), devices.size());
758     EXPECT_EQ(result[DEVICE_A], INVALID_ARGS);
759     EXPECT_EQ(result[invalidDev], INVALID_ARGS);
760     EXPECT_EQ(result[DEVICE_B], OK);
761 }
762 
763 /**
764  * @tc.name: Limit Data Sync 001
765  * @tc.desc: Test sync limit key and value data
766  * @tc.type: FUNC
767  * @tc.require: AR000CCPOM
768  * @tc.author: xushaohua
769  */
770 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, LimitDataSync001, TestSize.Level1)
771 {
772     DBStatus status = OK;
773     std::vector<std::string> devices;
774     devices.push_back(g_deviceB->GetDeviceId());
775 
776     Key key1;
777     Value value1;
778     DistributedDBToolsUnitTest::GetRandomKeyValue(key1, DBConstant::MAX_KEY_SIZE + 1);
779     DistributedDBToolsUnitTest::GetRandomKeyValue(value1, DBConstant::MAX_VALUE_SIZE + 1);
780 
781     Key key2;
782     Value value2;
783     DistributedDBToolsUnitTest::GetRandomKeyValue(key2, DBConstant::MAX_KEY_SIZE);
784     DistributedDBToolsUnitTest::GetRandomKeyValue(value2, DBConstant::MAX_VALUE_SIZE);
785 
786     /**
787      * @tc.steps: step1. deviceB put {k1, v1}, K1 > 1k, v1 > 4M
788      */
789     g_deviceB->PutData(key1, value1, 0, 0);
790 
791     /**
792      * @tc.steps: step2. deviceB put {k2, v2}, K2 = 1k, v2 = 4M
793      */
794     g_deviceC->PutData(key2, value2, 0, 0);
795 
796     /**
797      * @tc.steps: step3. deviceA call pull sync from device B
798      * @tc.expected: step3. sync should return OK.
799      */
800     std::map<std::string, DBStatus> result;
801     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
802     ASSERT_TRUE(status == OK);
803 
804     /**
805      * @tc.expected: step3. onComplete should be called.
806      */
807     ASSERT_TRUE(result.size() == devices.size());
808     for (const auto &pair : result) {
809         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
810         if (pair.first == g_deviceB->GetDeviceId()) {
811             EXPECT_TRUE(pair.second != OK);
812         } else {
813             EXPECT_TRUE(pair.second == OK);
814         }
815     }
816 
817     /**
818      * @tc.steps: step4. deviceA call pull sync from deviceC
819      * @tc.expected: step4. sync should return OK.
820      */
821     devices.clear();
822     result.clear();
823     devices.push_back(g_deviceC->GetDeviceId());
824     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
825     ASSERT_TRUE(status == OK);
826 
827     /**
828      * @tc.expected: step4. onComplete should be called, DeviceA have {k2. v2}, don't have {k1, v1}
829      */
830     ASSERT_TRUE(result.size() == devices.size());
831     for (const auto &pair : result) {
832         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
833         EXPECT_TRUE(pair.second == OK);
834     }
835 
836     // Get value from A
837     Value valueRead;
838     EXPECT_TRUE(g_kvDelegatePtr->Get(key1, valueRead) != OK);
839     valueRead.clear();
840     EXPECT_EQ(g_kvDelegatePtr->Get(key2, valueRead), OK);
841     EXPECT_TRUE(valueRead == value2);
842 }
843 
844 /**
845  * @tc.name: Auto Sync 001
846  * @tc.desc: Verify auto sync enable function.
847  * @tc.type: FUNC
848  * @tc.require: AR000CKRTD AR000CQE0E
849  * @tc.author: xushaohua
850  */
851 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, AutoSync001, TestSize.Level1)
852 {
853     std::vector<std::string> devices;
854     devices.push_back(g_deviceB->GetDeviceId());
855     devices.push_back(g_deviceC->GetDeviceId());
856 
857     /**
858      * @tc.steps: step1. enable auto sync
859      * @tc.expected: step1, Pragma return OK.
860      */
861     bool autoSync = true;
862     PragmaData data = static_cast<PragmaData>(&autoSync);
863     DBStatus status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
864     ASSERT_EQ(status, OK);
865 
866     /**
867      * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
868      */
869     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
870     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_2, VALUE_2) == OK);
871 
872     /**
873      * @tc.steps: step3. sleep for data sync
874      * @tc.expected: step3. deviceB,C has {k1, v1}, {k2, v2}
875      */
876     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
877     VirtualDataItem item;
878     g_deviceB->GetData(KEY_1, item);
879     EXPECT_EQ(item.value, VALUE_1);
880     g_deviceB->GetData(KEY_2, item);
881     EXPECT_EQ(item.value, VALUE_2);
882     g_deviceC->GetData(KEY_1, item);
883     EXPECT_EQ(item.value, VALUE_1);
884     g_deviceC->GetData(KEY_2, item);
885     EXPECT_EQ(item.value, VALUE_2);
886 }
887 
888 /**
889  * @tc.name: Auto Sync 002
890  * @tc.desc: Verify auto sync disable function.
891  * @tc.type: FUNC
892  * @tc.require: AR000CKRTD AR000CQE0E
893  * @tc.author: xushaohua
894  */
895 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, AutoSync002, TestSize.Level1)
896 {
897     std::vector<std::string> devices;
898     devices.push_back(g_deviceB->GetDeviceId());
899     devices.push_back(g_deviceC->GetDeviceId());
900 
901     /**
902      * @tc.steps: step1. disable auto sync
903      * @tc.expected: step1, Pragma return OK.
904      */
905     bool autoSync = false;
906     PragmaData data = static_cast<PragmaData>(&autoSync);
907     DBStatus status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
908     ASSERT_EQ(status, OK);
909 
910     /**
911      * @tc.steps: step2. deviceB put {k1, v1}, deviceC put {k2, v2}
912      */
913     g_deviceB->PutData(KEY_1, VALUE_1, 0, 0);
914     g_deviceC->PutData(KEY_2, VALUE_2, 0, 0);
915 
916     /**
917      * @tc.steps: step3. sleep for data sync
918      * @tc.expected: step3. deviceA don't have k1, k2.
919      */
920     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
921     Value value3;
922     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == NOT_FOUND);
923     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == NOT_FOUND);
924 }
925 
926 /**
927  * @tc.name: Block Sync 001
928  * @tc.desc: Verify block push sync function.
929  * @tc.type: FUNC
930  * @tc.require: AR000CKRTD AR000CQE0E
931  * @tc.author: xushaohua
932  */
933 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync001, TestSize.Level1)
934 {
935     std::vector<std::string> devices;
936     devices.push_back(g_deviceB->GetDeviceId());
937     devices.push_back(g_deviceC->GetDeviceId());
938 
939     /**
940      * @tc.steps: step1. deviceA put {k1, v1}
941      */
942     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
943 
944     /**
945      * @tc.steps: step2. deviceA call block push sync to deviceB & deviceC.
946      * @tc.expected: step2. Sync return OK, devices status OK, deviceB & deivceC has {k1, v1}.
947      */
948     std::map<std::string, DBStatus> result;
949     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
950     ASSERT_EQ(status, OK);
951     ASSERT_TRUE(result.size() == devices.size());
952     for (const auto &pair : result) {
953         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
954         EXPECT_TRUE(pair.second == OK);
955     }
956     VirtualDataItem item1;
957     EXPECT_EQ(g_deviceB->GetData(KEY_1, item1), OK);
958     EXPECT_EQ(item1.value, VALUE_1);
959     VirtualDataItem item2;
960     EXPECT_EQ(g_deviceC->GetData(KEY_1, item2), OK);
961     EXPECT_EQ(item2.value, VALUE_1);
962 }
963 
964 /**
965  * @tc.name:  Block Sync 002
966  * @tc.desc: Verify block pull sync function.
967  * @tc.type: FUNC
968  * @tc.require: AR000CKRTD AR000CQE0E
969  * @tc.author: xushaohua
970  */
971 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync002, TestSize.Level1)
972 {
973     std::vector<std::string> devices;
974     devices.push_back(g_deviceB->GetDeviceId());
975     devices.push_back(g_deviceC->GetDeviceId());
976 
977     /**
978      * @tc.steps: step1. deviceB put {k1, v1}, deviceC put {k2, v2}
979      */
980     g_deviceB->PutData(KEY_1, VALUE_1, 0, 0);
981     g_deviceC->PutData(KEY_2, VALUE_2, 0, 0);
982 
983     /**
984      * @tc.steps: step2. deviceA call block pull and pull sync to deviceB & deviceC.
985      * @tc.expected: step2. Sync return OK, devices status OK, deviceA has {k1, v1}, {k2, v2}
986      */
987     std::map<std::string, DBStatus> result;
988     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
989     ASSERT_EQ(status, OK);
990     ASSERT_TRUE(result.size() == devices.size());
991     for (const auto &pair : result) {
992         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
993         EXPECT_TRUE(pair.second == OK);
994     }
995     Value value3;
996     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == OK);
997     EXPECT_TRUE(value3 == VALUE_1);
998     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == OK);
999     EXPECT_TRUE(value3 == VALUE_2);
1000 }
1001 
1002 /**
1003  * @tc.name:  Block Sync 003
1004  * @tc.desc: Verify block push and pull sync function.
1005  * @tc.type: FUNC
1006  * @tc.require: AR000CKRTD AR000CQE0E
1007  * @tc.author: xushaohua
1008  */
1009 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync003, TestSize.Level1)
1010 {
1011     std::vector<std::string> devices;
1012     devices.push_back(g_deviceB->GetDeviceId());
1013     devices.push_back(g_deviceC->GetDeviceId());
1014 
1015     /**
1016      * @tc.steps: step1. deviceA put {k1, v1}
1017      */
1018     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1019 
1020     /**
1021      * @tc.steps: step2. deviceB put {k1, v1}, deviceB put {k2, v2}
1022      */
1023     g_deviceB->PutData(KEY_2, VALUE_2, 0, 0);
1024     g_deviceC->PutData(KEY_3, VALUE_3, 0, 0);
1025 
1026     /**
1027      * @tc.steps: step3. deviceA call block pull and pull sync to deviceB & deviceC.
1028      * @tc.expected: step3. Sync return OK, devices status OK, deviceA has {k1, v1}, {k2, v2} {k3, v3}
1029      *      deviceB has {k1, v1}, {k2. v2} , deviceC has {k1, v1}, {k3, v3}
1030      */
1031     std::map<std::string, DBStatus> result;
1032     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, true);
1033     ASSERT_EQ(status, OK);
1034     ASSERT_TRUE(result.size() == devices.size());
1035     for (const auto &pair : result) {
1036         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1037         EXPECT_TRUE(pair.second == OK);
1038     }
1039 
1040     VirtualDataItem item1;
1041     g_deviceB->GetData(KEY_1, item1);
1042     EXPECT_TRUE(item1.value == VALUE_1);
1043     g_deviceB->GetData(KEY_2, item1);
1044     EXPECT_TRUE(item1.value == VALUE_2);
1045 
1046     VirtualDataItem item2;
1047     g_deviceC->GetData(KEY_1, item2);
1048     EXPECT_TRUE(item2.value == VALUE_1);
1049     g_deviceC->GetData(KEY_3, item2);
1050     EXPECT_TRUE(item2.value == VALUE_3);
1051 
1052     Value value3;
1053     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == OK);
1054     EXPECT_TRUE(value3 == VALUE_1);
1055     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == OK);
1056     EXPECT_TRUE(value3 == VALUE_2);
1057     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_3, value3) == OK);
1058     EXPECT_TRUE(value3 == VALUE_3);
1059 }
1060 
1061 /**
1062  * @tc.name:  Block Sync 004
1063  * @tc.desc: Verify block sync function invalid args.
1064  * @tc.type: FUNC
1065  * @tc.require: AR000CKRTD AR000CQE0E
1066  * @tc.author: xushaohua
1067  */
1068 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync004, TestSize.Level2)
1069 {
1070     std::vector<std::string> devices;
1071 
1072     /**
1073      * @tc.steps: step1. deviceA put {k1, v1}
1074      */
1075     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1076 
1077     /**
1078      * @tc.steps: step2. deviceA call block push sync to deviceB & deviceC.
1079      * @tc.expected: step2. Sync return INVALID_ARGS
1080      */
1081     std::map<std::string, DBStatus> result;
1082     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1083     EXPECT_EQ(status, INVALID_ARGS);
1084 
1085     /**
1086      * @tc.steps: step3. deviceB, deviceC offlinem and push deviceA sync to deviceB and deviceC.
1087      * @tc.expected: step3. Sync return OK, but the deviceB and deviceC are TIME_OUT
1088      */
1089     devices.push_back(g_deviceB->GetDeviceId());
1090     devices.push_back(g_deviceC->GetDeviceId());
1091     g_deviceB->Offline();
1092     g_deviceC->Offline();
1093 
1094     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1095     EXPECT_EQ(status, OK);
1096     ASSERT_TRUE(result.size() == devices.size());
1097     for (const auto &pair : result) {
1098         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1099         EXPECT_TRUE(pair.second == COMM_FAILURE);
1100     }
1101 }
1102 
1103 /**
1104  * @tc.name:  Block Sync 005
1105  * @tc.desc: Verify block sync function busy.
1106  * @tc.type: FUNC
1107  * @tc.require: AR000CKRTD AR000CQE0E
1108  * @tc.author: xushaohua
1109  */
1110 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync005, TestSize.Level2)
1111 {
1112     std::vector<std::string> devices;
1113     devices.push_back(g_deviceB->GetDeviceId());
1114     devices.push_back(g_deviceC->GetDeviceId());
1115     /**
1116      * @tc.steps: step1. deviceA put {k1, v1}
1117      */
1118     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1119 
1120     /**
1121      * @tc.steps: step2. New a thread to deviceA call block push sync to deviceB & deviceC,
1122      *      but deviceB & C is blocked
1123      * @tc.expected: step2. Sync will be blocked util timeout, and then return OK
1124      */
1125     g_deviceB->Offline();
1126     g_deviceC->Offline();
__anond4119c320302() 1127     thread thread([devices]() {
1128         std::map<std::string, DBStatus> resultInner;
1129         DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1130         EXPECT_EQ(status, OK);
1131     });
1132     thread.detach();
1133     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1134     /**
1135      * @tc.steps: step3. sleep 1s and call sync.
1136      * @tc.expected: step3. Sync will return BUSY.
1137      */
1138     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1139     std::map<std::string, DBStatus> result;
1140     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, true);
1141     EXPECT_EQ(status, OK);
1142 }
1143 
1144 /**
1145   * @tc.name: SyncQueue001
1146   * @tc.desc: Invalid args check of Pragma GET_QUEUED_SYNC_SIZE SET_QUEUED_SYNC_LIMIT and
1147   * GET_QUEUED_SYNC_LIMIT, expect return INVALID_ARGS.
1148   * @tc.type: FUNC
1149   * @tc.require: AR000D4876
1150   * @tc.author: wangchuanqing
1151   */
1152 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue001, TestSize.Level3)
1153 {
1154     /**
1155      * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE, and set param to be null
1156      * @tc.expected: step1. Expect return INVALID_ARGS.
1157      */
1158     int *param = nullptr;
1159     PragmaData input = static_cast<PragmaData>(param);
1160     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), INVALID_ARGS);
1161 
1162     /**
1163      * @tc.steps:step2. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be null
1164      * @tc.expected: step2. Expect return INVALID_ARGS.
1165      */
1166     input = static_cast<PragmaData>(param);
1167     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1168 
1169     /**
1170      * @tc.steps:step3. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT, and set param to be null
1171      * @tc.expected: step3. Expect return INVALID_ARGS.
1172      */
1173     input = static_cast<PragmaData>(param);
1174     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1175 
1176     /**
1177      * @tc.steps:step4. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be QUEUED_SYNC_LIMIT_MIN - 1
1178      * @tc.expected: step4. Expect return INVALID_ARGS.
1179      */
1180     int limit = DBConstant::QUEUED_SYNC_LIMIT_MIN - 1;
1181     input = static_cast<PragmaData>(&limit);
1182     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1183 
1184     /**
1185      * @tc.steps:step5. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be QUEUED_SYNC_LIMIT_MAX + 1
1186      * @tc.expected: step5. Expect return INVALID_ARGS.
1187      */
1188     limit = DBConstant::QUEUED_SYNC_LIMIT_MAX + 1;
1189     input = static_cast<PragmaData>(&limit);
1190     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1191 }
1192 
1193 /**
1194   * @tc.name: SyncQueue002
1195   * @tc.desc: Pragma GET_QUEUED_SYNC_LIMIT and SET_QUEUED_SYNC_LIMIT
1196   * @tc.type: FUNC
1197   * @tc.require: AR000D4876
1198   * @tc.author: wangchuanqing
1199   */
1200 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue002, TestSize.Level3)
1201 {
1202     /**
1203      * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT,
1204      * @tc.expected: step1. Expect return OK, limit eq QUEUED_SYNC_LIMIT_DEFAULT.
1205      */
1206     int limit = 0;
1207     PragmaData input = static_cast<PragmaData>(&limit);
1208     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), OK);
1209     EXPECT_EQ(limit, DBConstant::QUEUED_SYNC_LIMIT_DEFAULT);
1210 
1211     /**
1212      * @tc.steps:step2. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be 50
1213      * @tc.expected: step2. Expect return OK.
1214      */
1215     limit = 50;
1216     input = static_cast<PragmaData>(&limit);
1217     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), OK);
1218 
1219     /**
1220      * @tc.steps:step3. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT,
1221      * @tc.expected: step3. Expect return OK, limit eq 50
1222      */
1223     limit = 0;
1224     input = static_cast<PragmaData>(&limit);
1225     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), OK);
1226     EXPECT_EQ(limit, 50);
1227 }
1228 
1229 /**
1230   * @tc.name: SyncQueue003
1231   * @tc.desc: sync queue test
1232   * @tc.type: FUNC
1233   * @tc.require: AR000D4876
1234   * @tc.author: wangchuanqing
1235   */
1236 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue003, TestSize.Level3)
1237 {
1238     DBStatus status = OK;
1239     std::vector<std::string> devices;
1240     devices.push_back(g_deviceB->GetDeviceId());
1241     devices.push_back(g_deviceC->GetDeviceId());
1242 
1243     /**
1244      * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1245      * @tc.expected: step1. Expect return OK, size eq 0.
1246      */
1247     int size;
1248     PragmaData input = static_cast<PragmaData>(&size);
1249     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1250     EXPECT_EQ(size, 0);
1251 
1252     /**
1253      * @tc.steps:step2. deviceA put {k1, v1}
1254      */
1255     status = g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1256     ASSERT_TRUE(status == OK);
1257 
1258     /**
1259      * @tc.steps:step3. deviceA sync SYNC_MODE_PUSH_ONLY
1260      */
1261     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1262     ASSERT_TRUE(status == OK);
1263 
1264     /**
1265      * @tc.steps:step4. deviceA put {k2, v2}
1266      */
1267     status = g_kvDelegatePtr->Put(KEY_2, VALUE_2);
1268     ASSERT_TRUE(status == OK);
1269 
1270     /**
1271      * @tc.steps:step5. deviceA sync SYNC_MODE_PUSH_ONLY
1272      */
1273     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1274     ASSERT_TRUE(status == OK);
1275 
1276     /**
1277      * @tc.steps:step6. deviceB put {k3, v3}
1278      */
1279     g_deviceB->PutData(KEY_3, VALUE_3, 0, 0);
1280 
1281     /**
1282      * @tc.steps:step7. deviceA put {k4, v4}
1283      */
1284     status = g_kvDelegatePtr->Put(KEY_4, VALUE_4);
1285     ASSERT_TRUE(status == OK);
1286 
1287     /**
1288      * @tc.steps:step8. deviceA sync SYNC_MODE_PUSH_PULL
1289      */
1290     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_PULL, nullptr, false);
1291     ASSERT_TRUE(status == OK);
1292 
1293     /**
1294      * @tc.steps:step9. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1295      * @tc.expected: step1. Expect return OK, 0 <= size <= 4
1296      */
1297     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1298     ASSERT_TRUE((size >= 0) && (size <= 4));
1299 
1300     /**
1301      * @tc.steps:step10. deviceB put {k5, v5}
1302      */
1303     g_deviceB->PutData(KEY_5, VALUE_5, 0, 0);
1304 
1305     /**
1306      * @tc.steps:step11. deviceA call sync and wait
1307      * @tc.expected: step11. sync should return OK.
1308      */
1309     std::map<std::string, DBStatus> result;
1310     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
1311     ASSERT_TRUE(status == OK);
1312 
1313     /**
1314      * @tc.expected: step11. onComplete should be called, DeviceA,B,C have {k1,v1}~ {KEY_5,VALUE_5}
1315      */
1316     ASSERT_TRUE(result.size() == devices.size());
1317     for (const auto &pair : result) {
1318         EXPECT_TRUE(pair.second == OK);
1319     }
1320     VirtualDataItem item;
1321     g_deviceB->GetData(KEY_1, item);
1322     EXPECT_TRUE(item.value == VALUE_1);
1323     g_deviceB->GetData(KEY_2, item);
1324     EXPECT_TRUE(item.value == VALUE_2);
1325     g_deviceB->GetData(KEY_3, item);
1326     EXPECT_TRUE(item.value == VALUE_3);
1327     g_deviceB->GetData(KEY_4, item);
1328     EXPECT_TRUE(item.value == VALUE_4);
1329     g_deviceB->GetData(KEY_5, item);
1330     EXPECT_TRUE(item.value == VALUE_5);
1331     Value value;
1332     EXPECT_EQ(g_kvDelegatePtr->Get(KEY_3, value), OK);
1333     EXPECT_EQ(VALUE_3, value);
1334     EXPECT_EQ(g_kvDelegatePtr->Get(KEY_5, value), OK);
1335     EXPECT_EQ(VALUE_5, value);
1336 }
1337 
1338 /**
1339   * @tc.name: SyncQueue004
1340   * @tc.desc: sync queue full test
1341   * @tc.type: FUNC
1342   * @tc.require: AR000D4876
1343   * @tc.author: wangchuanqing
1344   */
1345 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue004, TestSize.Level3)
1346 {
1347     DBStatus status = OK;
1348     std::vector<std::string> devices;
1349     devices.push_back(g_deviceB->GetDeviceId());
1350     devices.push_back(g_deviceC->GetDeviceId());
1351 
1352     /**
1353      * @tc.steps:step1. deviceB C block
1354      */
1355     g_communicatorAggregator->SetBlockValue(true);
1356 
1357     /**
1358      * @tc.steps:step2. deviceA put {k1, v1}
1359      */
1360     status = g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1361     ASSERT_TRUE(status == OK);
1362 
1363     /**
1364      * @tc.steps:step3. deviceA sync QUEUED_SYNC_LIMIT_DEFAULT times
1365      * @tc.expected: step3. Expect return OK
1366      */
1367     for (int i = 0; i < DBConstant::QUEUED_SYNC_LIMIT_DEFAULT; i++) {
1368         status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1369         ASSERT_TRUE(status == OK);
1370     }
1371 
1372     /**
1373      * @tc.steps:step4. deviceA sync
1374      * @tc.expected: step4. Expect return BUSY
1375      */
1376     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1377     ASSERT_TRUE(status == BUSY);
1378     g_communicatorAggregator->SetBlockValue(false);
1379 }
1380 
1381 /**
1382   * @tc.name: SyncQueue005
1383   * @tc.desc: block sync queue test
1384   * @tc.type: FUNC
1385   * @tc.require: AR000D4876
1386   * @tc.author: wangchuanqing
1387   */
1388 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue005, TestSize.Level3)
1389 {
1390     std::vector<std::string> devices;
1391     devices.push_back(g_deviceB->GetDeviceId());
1392     devices.push_back(g_deviceC->GetDeviceId());
1393     /**
1394      * @tc.steps:step1. New a thread to deviceA call block push sync to deviceB & deviceC,
1395      *      but deviceB & C is offline
1396      * @tc.expected: step1. Sync will be blocked util timeout, and then return OK
1397      */
1398     g_deviceB->Offline();
1399     g_deviceC->Offline();
1400     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1401 
1402     /**
1403      * @tc.steps:step2. deviceA put {k1, v1}
1404      */
1405     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1406 
1407     std::mutex lockMutex;
1408     std::condition_variable conditionVar;
1409 
__anond4119c320402() 1410     std::thread threadFirst([devices]() {
1411         std::map<std::string, DBStatus> resultInner;
1412         DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1413         EXPECT_EQ(status, OK);
1414     });
1415     threadFirst.detach();
1416     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1417     /**
1418      * @tc.steps:step3. New a thread to deviceA call block push sync to deviceB & deviceC,
1419      *      but deviceB & C is offline
1420      * @tc.expected: step2. Sync will be blocked util timeout, and then return OK
1421      */
__anond4119c320502() 1422     std::thread threadSecond([devices, &lockMutex, &conditionVar]() {
1423         std::map<std::string, DBStatus> resultInner;
1424         DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1425         EXPECT_EQ(status, OK);
1426         std::unique_lock<mutex> lockInner(lockMutex);
1427         conditionVar.notify_one();
1428     });
1429     threadSecond.detach();
1430 
1431     /**
1432      * @tc.steps:step4. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1433      * @tc.expected: step1. Expect return OK, size eq 0.
1434      */
1435     int size;
1436     PragmaData input = static_cast<PragmaData>(&size);
1437     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1438     EXPECT_EQ(size, 0);
1439 
1440     /**
1441      * @tc.steps:step5. wait exit
1442      */
1443     std::unique_lock<mutex> lock(lockMutex);
1444     auto now = std::chrono::system_clock::now();
1445     conditionVar.wait_until(lock, now + 2 * INT8_MAX * 1000ms);
1446 }
1447 
1448 /**
1449   * @tc.name: CalculateSyncData001
1450   * @tc.desc: Test sync data whose device never synced before
1451   * @tc.type: FUNC
1452   * @tc.require: AR000HI2JS
1453   * @tc.author: zhuwentao
1454   */
1455 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData001, TestSize.Level3)
1456 {
1457     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1458     size_t dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
1459     uint32_t serialHeadLen = 8u;
1460     EXPECT_EQ(static_cast<uint32_t>(dataSize), 0u + serialHeadLen);
1461     uint32_t keySize = 256u;
1462     uint32_t valuesize = 1024u;
1463     uint32_t itemCount = 10u;
1464     CalculateDataTest(itemCount, keySize, valuesize);
1465 }
1466 
1467 /**
1468   * @tc.name: CalculateSyncData002
1469   * @tc.desc: Test sync data whose device synced before, but sync data is less than 1M
1470   * @tc.type: FUNC
1471   * @tc.require: AR000HI2JS
1472   * @tc.author: zhuwentao
1473   */
1474 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData002, TestSize.Level3)
1475 {
1476     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1477     Key key1 = {'1'};
1478     Value value1 = {'1'};
1479     EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
1480 
1481     std::vector<std::string> devices;
1482     devices.push_back(g_deviceB->GetDeviceId());
1483     std::map<std::string, DBStatus> result;
1484     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1485     ASSERT_TRUE(status == OK);
1486     ASSERT_TRUE(result.size() == devices.size());
1487     for (const auto &pair : result) {
1488         EXPECT_TRUE(pair.second == OK);
1489     }
1490 
1491     uint32_t keySize = 256u;
1492     uint32_t valuesize = 512u;
1493     uint32_t itemCount = 20u;
1494     CalculateDataTest(itemCount, keySize, valuesize);
1495 }
1496 
1497 /**
1498   * @tc.name: CalculateSyncData003
1499   * @tc.desc: Test sync data whose device synced before, but sync data is larger than 1M
1500   * @tc.type: FUNC
1501   * @tc.require: AR000HI2JS
1502   * @tc.author: zhuwentao
1503   */
1504 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData003, TestSize.Level3)
1505 {
1506     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1507     Key key1 = {'1'};
1508     Value value1 = {'1'};
1509     EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
1510 
1511     std::vector<std::string> devices;
1512     devices.push_back(g_deviceB->GetDeviceId());
1513     std::map<std::string, DBStatus> result;
1514     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1515     ASSERT_TRUE(status == OK);
1516     ASSERT_TRUE(result.size() == devices.size());
1517     for (const auto &pair : result) {
1518         EXPECT_TRUE(pair.second == OK);
1519     }
1520     uint32_t keySize = 256u;
1521     uint32_t valuesize = 1024u;
1522     uint32_t itemCount = 2048u;
1523     CalculateDataTest(itemCount, keySize, valuesize);
1524 }
1525 
1526 /**
1527   * @tc.name: CalculateSyncData004
1528   * @tc.desc: Test invalid device when call GetSyncDataSize interface
1529   * @tc.type: FUNC
1530   * @tc.require: AR000HI2JS
1531   * @tc.author: zhuwentao
1532   */
1533 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData004, TestSize.Level3)
1534 {
1535     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1536     std::string device;
1537     EXPECT_EQ(g_kvDelegatePtr->GetSyncDataSize(device), 0u);
1538 }
1539 
1540 /**
1541   * @tc.name: CalculateSyncData005
1542   * @tc.desc: Test CalculateSyncData and rekey Concurrently
1543   * @tc.type: FUNC
1544   * @tc.require: AR000HI2JS
1545   * @tc.author: zhuwentao
1546   */
1547 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData005, TestSize.Level3)
1548 {
1549     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1550     size_t dataSize = 0;
1551     Key key1 = {'1'};
1552     Value value1 = {'1'};
1553     EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
__anond4119c320602() 1554     std::thread thread1([]() {
1555         std::this_thread::sleep_for(std::chrono::milliseconds(1));
1556         CipherPassword passwd; // random password
1557         vector<uint8_t> passwdBuffer(10, 45);  // 10 and 45 as random password.
1558         passwd.SetValue(passwdBuffer.data(), passwdBuffer.size());
1559         g_kvDelegatePtr->Rekey(passwd);
1560     });
__anond4119c320702() 1561     std::thread thread2([&dataSize, &key1, &value1]() {
1562         dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
1563         if (dataSize > 0) {
1564             uint32_t expectedDataSize = (key1.size() + value1.size());
1565             uint32_t externalSize = 70u;
1566             uint32_t serialHeadLen = 8u;
1567             ASSERT_GE(static_cast<uint32_t>(dataSize), expectedDataSize);
1568             ASSERT_LE(static_cast<uint32_t>(dataSize), serialHeadLen + expectedDataSize + externalSize);
1569         }
1570     });
1571     thread1.join();
1572     thread2.join();
1573 }