• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <condition_variable>
17 #include <gtest/gtest.h>
18 #include <thread>
19 
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "platform_specific.h"
27 #include "runtime_config.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_kv_sync_task_context.h"
30 
31 using namespace testing::ext;
32 using namespace DistributedDB;
33 using namespace DistributedDBUnitTest;
34 using namespace std;
35 
36 namespace {
37     string g_testDir;
38     const string STORE_ID = "kv_stroe_sync_test";
39     const int64_t TIME_OFFSET = 5000000;
40     const int WAIT_TIME = 1000;
41     const std::string DEVICE_A = "real_device";
42     const std::string DEVICE_B = "deviceB";
43     const std::string DEVICE_C = "deviceC";
44 
45     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
46     KvStoreConfig g_config;
47     DistributedDBToolsUnitTest g_tool;
48     DBStatus g_kvDelegateStatus = INVALID_ARGS;
49     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
50     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
51     KvVirtualDevice *g_deviceB = nullptr;
52     KvVirtualDevice *g_deviceC = nullptr;
53 
54     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
55     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
56         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
57 
CalculateDataTest(uint32_t itemCount,uint32_t keySize,uint32_t valueSize)58     void CalculateDataTest(uint32_t itemCount, uint32_t keySize, uint32_t valueSize)
59     {
60         for (uint32_t i = 0; i < itemCount; i++) {
61             std::vector<uint8_t> prefixKey = {'a', 'b', 'c'};
62             Key key = DistributedDBToolsUnitTest::GetRandPrefixKey(prefixKey, keySize);
63             Value value;
64             DistributedDBToolsUnitTest::GetRandomKeyValue(value, valueSize);
65             EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
66         }
67         size_t dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
68         uint32_t expectedDataSize = (valueSize + keySize);
69         uint32_t externalSize = 70u;
70         uint32_t serialHeadLen = 8u;
71         LOGI("expectedDataSize=%u, v=%u", expectedDataSize, externalSize);
72         uint32_t maxDataSize = 1024u * 1024u;
73         if (itemCount * expectedDataSize >= maxDataSize) {
74             EXPECT_EQ(static_cast<uint32_t>(dataSize), maxDataSize);
75             return;
76         }
77         ASSERT_GE(static_cast<uint32_t>(dataSize), itemCount * expectedDataSize);
78         ASSERT_LE(static_cast<uint32_t>(dataSize), serialHeadLen + itemCount * (expectedDataSize + externalSize));
79     }
80 
81 class DistributedDBSingleVerP2PSimpleSyncTest : public testing::Test {
82 public:
83     static void SetUpTestCase(void);
84     static void TearDownTestCase(void);
85     void SetUp();
86     void TearDown();
87 };
88 
SetUpTestCase(void)89 void DistributedDBSingleVerP2PSimpleSyncTest::SetUpTestCase(void)
90 {
91     /**
92      * @tc.setup: Init datadir and Virtual Communicator.
93      */
94     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
95     g_config.dataDir = g_testDir;
96     g_mgr.SetKvStoreConfig(g_config);
97 
98     string dir = g_testDir + "/single_ver";
99     DIR* dirTmp = opendir(dir.c_str());
100     if (dirTmp == nullptr) {
101         OS::MakeDBDirectory(dir);
102     } else {
103         closedir(dirTmp);
104     }
105 
106     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
107     ASSERT_TRUE(g_communicatorAggregator != nullptr);
108     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
109 }
110 
TearDownTestCase(void)111 void DistributedDBSingleVerP2PSimpleSyncTest::TearDownTestCase(void)
112 {
113     /**
114      * @tc.teardown: Release virtual Communicator and clear data dir.
115      */
116     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
117         LOGE("rm test db files error!");
118     }
119     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
120 }
121 
SetUp(void)122 void DistributedDBSingleVerP2PSimpleSyncTest::SetUp(void)
123 {
124     DistributedDBToolsUnitTest::PrintTestCaseInfo();
125     /**
126      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
127      */
128     KvStoreNbDelegate::Option option;
129     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
130     ASSERT_TRUE(g_kvDelegateStatus == OK);
131     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
132     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
133     ASSERT_TRUE(g_deviceB != nullptr);
134     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
135     ASSERT_TRUE(syncInterfaceB != nullptr);
136     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
137 
138     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
139     ASSERT_TRUE(g_deviceC != nullptr);
140     VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
141     ASSERT_TRUE(syncInterfaceC != nullptr);
142     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
143 
144     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
145         const std::string &deviceId, uint8_t flag) -> bool {
146             return true;
147         };
148     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
149 }
150 
TearDown(void)151 void DistributedDBSingleVerP2PSimpleSyncTest::TearDown(void)
152 {
153     /**
154      * @tc.teardown: Release device A, B, C
155      */
156     if (g_kvDelegatePtr != nullptr) {
157         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
158         g_kvDelegatePtr = nullptr;
159         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
160         LOGD("delete kv store status %d", status);
161         ASSERT_TRUE(status == OK);
162     }
163     if (g_deviceB != nullptr) {
164         delete g_deviceB;
165         g_deviceB = nullptr;
166     }
167     if (g_deviceC != nullptr) {
168         delete g_deviceC;
169         g_deviceC = nullptr;
170     }
171     PermissionCheckCallbackV2 nullCallback;
172     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
173 }
174 
CheckWatermark(const std::string & dev,KvStoreNbDelegate * kvDelegatePtr,WatermarkInfo expectInfo,bool sendEqual=true,bool receiveEqual=true)175 void CheckWatermark(const std::string &dev, KvStoreNbDelegate *kvDelegatePtr, WatermarkInfo expectInfo,
176     bool sendEqual = true, bool receiveEqual = true)
177 {
178     auto [status, watermarkInfo] = kvDelegatePtr->GetWatermarkInfo(dev);
179     EXPECT_EQ(status, OK);
180     if (sendEqual) {
181         EXPECT_EQ(watermarkInfo.sendMark, expectInfo.sendMark);
182     } else {
183         EXPECT_NE(watermarkInfo.sendMark, expectInfo.sendMark);
184     }
185     if (receiveEqual) {
186         EXPECT_EQ(watermarkInfo.receiveMark, expectInfo.receiveMark);
187     } else {
188         EXPECT_NE(watermarkInfo.receiveMark, expectInfo.receiveMark);
189     }
190 }
191 
192 /**
193  * @tc.name: Normal Sync 001
194  * @tc.desc: Test normal push sync for add data.
195  * @tc.type: FUNC
196  * @tc.require:
197  * @tc.author: xushaohua
198  */
199 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync001, TestSize.Level1)
200 {
201     DBStatus status = OK;
202     std::vector<std::string> devices;
203     devices.push_back(g_deviceB->GetDeviceId());
204     devices.push_back(g_deviceC->GetDeviceId());
205 
206     /**
207      * @tc.steps: step1. deviceA put {k1, v1}
208      */
209     Key key = {'1'};
210     Value value = {'1'};
211 
212     WatermarkInfo info;
213     CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info);
214     CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info);
215     status = g_kvDelegatePtr->Put(key, value);
216     ASSERT_TRUE(status == OK);
217 
218     /**
219      * @tc.steps: step2. deviceA call sync and wait
220      * @tc.expected: step2. sync should return OK.
221      */
222     std::map<std::string, DBStatus> result;
223     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
224     ASSERT_TRUE(status == OK);
225     CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info, false);
226     CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info, false);
227 
228     /**
229      * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
230      */
231     ASSERT_TRUE(result.size() == devices.size());
232     for (const auto &pair : result) {
233         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
234         EXPECT_TRUE(pair.second == OK);
235     }
236     VirtualDataItem item;
237     g_deviceB->GetData(key, item);
238     EXPECT_TRUE(item.value == value);
239     g_deviceC->GetData(key, item);
240     EXPECT_TRUE(item.value == value);
241 }
242 
243 /**
244  * @tc.name: Normal Sync 002
245  * @tc.desc: Test normal push sync for update data.
246  * @tc.type: FUNC
247  * @tc.require:
248  * @tc.author: xushaohua
249  */
250 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync002, TestSize.Level1)
251 {
252     DBStatus status = OK;
253     std::vector<std::string> devices;
254     devices.push_back(g_deviceB->GetDeviceId());
255     devices.push_back(g_deviceC->GetDeviceId());
256 
257     /**
258      * @tc.steps: step1. deviceA put {k1, v1}
259      */
260     Key key = {'1'};
261     Value value = {'1'};
262     status = g_kvDelegatePtr->Put(key, value);
263     ASSERT_TRUE(status == OK);
264 
265     /**
266      * @tc.steps: step2. deviceA put {k1, v2}
267      */
268     Value value2;
269     value2.push_back('2');
270     status = g_kvDelegatePtr->Put(key, value2);
271     ASSERT_TRUE(status == OK);
272 
273     /**
274      * @tc.steps: step3. deviceA call sync and wait
275      * @tc.expected: step3. sync should return OK.
276      */
277     std::map<std::string, DBStatus> result;
278     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
279     ASSERT_TRUE(status == OK);
280 
281     /**
282      * @tc.expected: step3. onComplete should be called, DeviceB,C have {k1,v2}
283      */
284     ASSERT_TRUE(result.size() == devices.size());
285     for (const auto &pair : result) {
286         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
287         EXPECT_TRUE(pair.second == OK);
288     }
289     VirtualDataItem item;
290     g_deviceC->GetData(key, item);
291     EXPECT_TRUE(item.value == value2);
292     g_deviceB->GetData(key, item);
293     EXPECT_TRUE(item.value == value2);
294 }
295 
296 /**
297  * @tc.name: Normal Sync 003
298  * @tc.desc: Test normal push sync for delete data.
299  * @tc.type: FUNC
300  * @tc.require:
301  * @tc.author: xushaohua
302  */
303 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync003, TestSize.Level1)
304 {
305     DBStatus status = OK;
306     std::vector<std::string> devices;
307     devices.push_back(g_deviceB->GetDeviceId());
308     devices.push_back(g_deviceC->GetDeviceId());
309 
310     /**
311      * @tc.steps: step1. deviceA put {k1, v1}
312      */
313     Key key = {'1'};
314     Value value = {'1'};
315     status = g_kvDelegatePtr->Put(key, value);
316     ASSERT_TRUE(status == OK);
317 
318     /**
319      * @tc.steps: step2. deviceA delete k1
320      */
321     status = g_kvDelegatePtr->Delete(key);
322     ASSERT_TRUE(status == OK);
323     std::map<std::string, DBStatus> result;
324     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
325     ASSERT_TRUE(status == OK);
326 
327     /**
328      * @tc.steps: step3. deviceA call sync and wait
329      * @tc.expected: step3. sync should return OK.
330      */
331     ASSERT_TRUE(result.size() == devices.size());
332     for (const auto &pair : result) {
333         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
334         EXPECT_TRUE(pair.second == OK);
335     }
336 
337     /**
338      * @tc.expected: step3. onComplete should be called, DeviceB,C have {k1, delete}
339      */
340     VirtualDataItem item;
341     Key hashKey;
342     DistributedDBToolsUnitTest::CalcHash(key, hashKey);
343     EXPECT_EQ(g_deviceB->GetData(hashKey, item), -E_NOT_FOUND);
344     EXPECT_EQ(g_deviceC->GetData(hashKey, item), -E_NOT_FOUND);
345 }
346 
347 /**
348  * @tc.name: Normal Sync 004
349  * @tc.desc: Test normal pull sync for add data.
350  * @tc.type: FUNC
351  * @tc.require:
352  * @tc.author: xushaohua
353  */
354 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync004, TestSize.Level1)
355 {
356     DBStatus status = OK;
357     std::vector<std::string> devices;
358     devices.push_back(g_deviceB->GetDeviceId());
359     devices.push_back(g_deviceC->GetDeviceId());
360 
361     WatermarkInfo info;
362     CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info);
363     CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info);
364     /**
365      * @tc.steps: step1. deviceB put {k1, v1}
366      */
367     Key key = {'1'};
368     Value value = {'1'};
369     g_deviceB->PutData(key, value, 0, 0);
370 
371     /**
372      * @tc.steps: step2. deviceB put {k2, v2}
373      */
374     Key key2 = {'2'};
375     Value value2 = {'2'};
376     g_deviceC->PutData(key2, value2, 0, 0);
377     ASSERT_TRUE(status == OK);
378 
379     /**
380      * @tc.steps: step3. deviceA call pull sync
381      * @tc.expected: step3. sync should return OK.
382      */
383     std::map<std::string, DBStatus> result;
384     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
385     ASSERT_TRUE(status == OK);
386     CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info, true, false);
387     CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info, true, false);
388 
389     /**
390      * @tc.expected: step3. onComplete should be called, DeviceA have {k1, VALUE_1}, {K2. VALUE_2}
391      */
392     ASSERT_TRUE(result.size() == devices.size());
393     for (const auto &pair : result) {
394         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
395         EXPECT_TRUE(pair.second == OK);
396     }
397     Value value3;
398     EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
399     EXPECT_EQ(value3, value);
400     EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
401     EXPECT_EQ(value3, value2);
402 }
403 
404 /**
405  * @tc.name: Normal Sync 005
406  * @tc.desc: Test normal pull sync for update data.
407  * @tc.type: FUNC
408  * @tc.require:
409  * @tc.author: xushaohua
410  */
411 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync005, TestSize.Level2)
412 {
413     DBStatus status = OK;
414     std::vector<std::string> devices;
415     devices.push_back(g_deviceB->GetDeviceId());
416     devices.push_back(g_deviceC->GetDeviceId());
417 
418     /**
419      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
420      */
421     Key key1 = {'1'};
422     Value value1 = {'1'};
423     status = g_kvDelegatePtr->Put(key1, value1);
424     ASSERT_TRUE(status == OK);
425     Key key2 = {'2'};
426     Value value2 = {'2'};
427     status = g_kvDelegatePtr->Put(key2, value2);
428     ASSERT_TRUE(status == OK);
429 
430     /**
431      * @tc.steps: step2. deviceB put {k1, v3} t2, t2 > t1
432      */
433     Value value3;
434     value3.push_back('3');
435     g_deviceB->PutData(key1, value3,
436         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 0);
437 
438     /**
439      * @tc.steps: step3. deviceC put {k2, v4} t2, t4 < t1
440      */
441     Value value4;
442     value4.push_back('4');
443     g_deviceC->PutData(key2, value4,
444         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
445 
446     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
447     /**
448      * @tc.steps: step4. deviceA call pull sync
449      * @tc.expected: step4. sync should return OK.
450      */
451     std::map<std::string, DBStatus> result;
452     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
453     ASSERT_TRUE(status == OK);
454 
455     /**
456      * @tc.expected: step4. onComplete should be called, DeviceA have {k1, v3}, {k2. v2}
457      */
458     ASSERT_TRUE(result.size() == devices.size());
459     for (const auto &pair : result) {
460         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
461         EXPECT_TRUE(pair.second == OK);
462     }
463 
464     Value value5;
465     g_kvDelegatePtr->Get(key1, value5);
466     EXPECT_TRUE(value5 == value3);
467     g_kvDelegatePtr->Get(key2, value5);
468     EXPECT_TRUE(value5 == value2);
469 }
470 
471 /**
472  * @tc.name: Normal Sync 006
473  * @tc.desc: Test normal pull sync for delete data.
474  * @tc.type: FUNC
475  * @tc.require:
476  * @tc.author: xushaohua
477  */
478 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync006, TestSize.Level2)
479 {
480     /**
481      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
482      */
483     Key key1 = {'1'};
484     Value value1 = {'1'};
485     DBStatus status = g_kvDelegatePtr->Put(key1, value1);
486     ASSERT_TRUE(status == OK);
487     Key key2 = {'2'};
488     Value value2 = {'2'};
489     status = g_kvDelegatePtr->Put(key2, value2);
490     ASSERT_TRUE(status == OK);
491 
492     /**
493      * @tc.steps: step2. deviceA put {k1, delete} t2, t2 <t1
494      */
495     Key hashKey1;
496     DistributedDBToolsUnitTest::CalcHash(key1, hashKey1);
497     g_deviceB->PutData(hashKey1, value1,
498         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 1);
499 
500     /**
501      * @tc.steps: step3. deviceA put {k1, delete} t3, t3 < t1
502      */
503     Key hashKey2;
504     DistributedDBToolsUnitTest::CalcHash(key2, hashKey2);
505     g_deviceC->PutData(hashKey2, value1,
506         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
507 
508     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
509     /**
510      * @tc.steps: step4. deviceA call pull sync
511      * @tc.expected: step4. sync should return OK.
512      */
513     std::map<std::string, DBStatus> result;
514     std::vector<std::string> devices;
515     devices.push_back(g_deviceB->GetDeviceId());
516     devices.push_back(g_deviceC->GetDeviceId());
517     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
518     ASSERT_TRUE(status == OK);
519 
520     /**
521      * @tc.expected: step4. onComplete should be called, DeviceA have {k2. v2} don't have k1
522      */
523     ASSERT_TRUE(result.size() == devices.size());
524     for (const auto &pair : result) {
525         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
526         EXPECT_TRUE(pair.second == OK);
527     }
528     Value value5;
529     g_kvDelegatePtr->Get(key1, value5);
530     EXPECT_TRUE(value5.empty());
531     g_kvDelegatePtr->Get(key2, value5);
532     EXPECT_TRUE(value5 == value2);
533 }
534 
535 /**
536  * @tc.name: Normal Sync 007
537  * @tc.desc: Test normal push_pull sync for add data.
538  * @tc.type: FUNC
539  * @tc.require:
540  * @tc.author: xushaohua
541  */
542 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync007, TestSize.Level1)
543 {
544     DBStatus status = OK;
545     std::vector<std::string> devices;
546     devices.push_back(g_deviceB->GetDeviceId());
547     devices.push_back(g_deviceC->GetDeviceId());
548 
549     /**
550      * @tc.steps: step1. deviceA put {k1, v1}
551      */
552     Key key1 = {'1'};
553     Value value1 = {'1'};
554     status = g_kvDelegatePtr->Put(key1, value1);
555     EXPECT_TRUE(status == OK);
556 
557     /**
558      * @tc.steps: step1. deviceB put {k2, v2}
559      */
560     Key key2 = {'2'};
561     Value value2 = {'2'};
562     g_deviceB->PutData(key2, value2, 0, 0);
563 
564     /**
565      * @tc.steps: step1. deviceB put {k3, v3}
566      */
567     Key key3 = {'3'};
568     Value value3 = {'3'};
569     g_deviceC->PutData(key3, value3, 0, 0);
570 
571     /**
572      * @tc.steps: step4. deviceA call push_pull sync
573      * @tc.expected: step4. sync should return OK.
574      */
575     std::map<std::string, DBStatus> result;
576     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
577     ASSERT_TRUE(status == OK);
578 
579     ASSERT_TRUE(result.size() == devices.size());
580     for (const auto &pair : result) {
581         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
582         EXPECT_TRUE(pair.second == OK);
583     }
584 
585     /**
586      * @tc.expected: step4. onComplete should be called, DeviceA have {k1. v1}, {k2, v2}, {k3, v3}
587      *     deviceB received {k1. v1}, don't received k3, deviceC received {k1. v1}, don't received k2
588      */
589     Value value4;
590     g_kvDelegatePtr->Get(key2, value4);
591     EXPECT_TRUE(value4 == value2);
592     g_kvDelegatePtr->Get(key3, value4);
593     EXPECT_TRUE(value4 == value3);
594 
595     VirtualDataItem item1;
596     g_deviceB->GetData(key1, item1);
597     EXPECT_TRUE(item1.value == value1);
598     item1.value.clear();
599     g_deviceB->GetData(key3, item1);
600     EXPECT_TRUE(item1.value.empty());
601 
602     VirtualDataItem item2;
603     g_deviceC->GetData(key1, item2);
604     EXPECT_TRUE(item2.value == value1);
605     item2.value.clear();
606     g_deviceC->GetData(key2, item2);
607     EXPECT_TRUE(item2.value.empty());
608 }
609 
610 /**
611  * @tc.name: Normal Sync 008
612  * @tc.desc: Test normal push_pull sync for update data.
613  * @tc.type: FUNC
614  * @tc.require:
615  * @tc.author: xushaohua
616  */
617 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync008, TestSize.Level2)
618 {
619     DBStatus status = OK;
620     std::vector<std::string> devices;
621     devices.push_back(g_deviceB->GetDeviceId());
622     devices.push_back(g_deviceC->GetDeviceId());
623 
624     /**
625      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
626      */
627     Key key1 = {'1'};
628     Value value1 = {'1'};
629     status = g_kvDelegatePtr->Put(key1, value1);
630     ASSERT_TRUE(status == OK);
631 
632     Key key2 = {'2'};
633     Value value2 = {'2'};
634     status = g_kvDelegatePtr->Put(key2, value2);
635     ASSERT_TRUE(status == OK);
636 
637     /**
638      * @tc.steps: step2. deviceB put {k1, v3} t2, t2 > t1
639      */
640     Value value3 = {'3'};
641     g_deviceB->PutData(key1, value3,
642         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 0);
643 
644     /**
645      * @tc.steps: step3. deviceB put {k1, v4} t3, t4 <t1
646      */
647     Value value4 = {'4'};
648     g_deviceC->PutData(key2, value4,
649         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
650     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
651 
652     /**
653      * @tc.steps: step4. deviceA call push_pull sync
654      * @tc.expected: step4. sync should return OK.
655      */
656     std::map<std::string, DBStatus> result;
657     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
658     ASSERT_TRUE(status == OK);
659     ASSERT_TRUE(result.size() == devices.size());
660     for (const auto &pair : result) {
661         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
662         EXPECT_TRUE(pair.second == OK);
663     }
664 
665     /**
666      * @tc.expected: step4. onComplete should be called, DeviceA have {k1. v3}, {k2, v2}
667      *     deviceB have {k1. v3}, deviceC have {k2. v2}
668      */
669     Value value5;
670     g_kvDelegatePtr->Get(key1, value5);
671     EXPECT_EQ(value5, value3);
672     g_kvDelegatePtr->Get(key2, value5);
673     EXPECT_EQ(value5, value2);
674 
675     VirtualDataItem item1;
676     g_deviceB->GetData(key1, item1);
677     EXPECT_TRUE(item1.value == value3);
678     item1.value.clear();
679     g_deviceB->GetData(key2, item1);
680     EXPECT_TRUE(item1.value == value2);
681 
682     VirtualDataItem item2;
683     g_deviceC->GetData(key2, item2);
684     EXPECT_TRUE(item2.value == value2);
685 }
686 
687 /**
688  * @tc.name: Normal Sync 009
689  * @tc.desc: Test normal push_pull sync for delete data.
690  * @tc.type: FUNC
691  * @tc.require:
692  * @tc.author: xushaohua
693  */
694 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync009, TestSize.Level2)
695 {
696     DBStatus status = OK;
697     std::vector<std::string> devices;
698     devices.push_back(g_deviceB->GetDeviceId());
699     devices.push_back(g_deviceC->GetDeviceId());
700 
701     /**
702      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
703      */
704     Key key1 = {'1'};
705     Value value1 = {'1'};
706     status = g_kvDelegatePtr->Put(key1, value1);
707     ASSERT_TRUE(status == OK);
708 
709     Key key2 = {'2'};
710     Value value2 = {'2'};
711     status = g_kvDelegatePtr->Put(key2, value2);
712     ASSERT_TRUE(status == OK);
713 
714     /**
715      * @tc.steps: step2. deviceB put {k1, delete} t2, t2 > t1
716      */
717     Key hashKey1;
718     DistributedDBToolsUnitTest::CalcHash(key1, hashKey1);
719     g_deviceB->PutData(hashKey1, value1,
720         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 1);
721 
722     /**
723      * @tc.steps: step3. deviceB put {k1, delete} t3, t2 < t1
724      */
725     Key hashKey2;
726     DistributedDBToolsUnitTest::CalcHash(key2, hashKey2);
727     g_deviceC->PutData(hashKey2, value2,
728         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 1);
729 
730     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
731     /**
732      * @tc.steps: step4. deviceA call push_pull sync
733      * @tc.expected: step4. sync should return OK.
734      */
735     std::map<std::string, DBStatus> result;
736     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
737     ASSERT_TRUE(status == OK);
738 
739     /**
740      * @tc.expected: step4. onComplete should be called, DeviceA have {k1. delete}, {k2, v2}
741      *     deviceB have {k2. v2}, deviceC have {k2. v2}
742      */
743     ASSERT_TRUE(result.size() == devices.size());
744     for (const auto &pair : result) {
745         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
746         EXPECT_TRUE(pair.second == OK);
747     }
748 
749     Value value3;
750     g_kvDelegatePtr->Get(key1, value3);
751     EXPECT_TRUE(value3.empty());
752     value3.clear();
753     g_kvDelegatePtr->Get(key2, value3);
754     EXPECT_TRUE(value3 == value2);
755 
756     VirtualDataItem item1;
757     g_deviceB->GetData(key2, item1);
758     EXPECT_TRUE(item1.value == value2);
759 
760     VirtualDataItem item2;
761     g_deviceC->GetData(key2, item2);
762     EXPECT_TRUE(item2.value == value2);
763 }
764 
765 /**
766  * @tc.name: Normal Sync 010
767  * @tc.desc: Test sync failed by invalid devices.
768  * @tc.type: FUNC
769  * @tc.require:
770  * @tc.author: zhangqiquan
771  */
772 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync010, TestSize.Level1)
773 {
774     DBStatus status = OK;
775     std::vector<std::string> devices;
776     std::string invalidDev = std::string(DBConstant::MAX_DEV_LENGTH + 1, '0');
777     devices.push_back(DEVICE_A);
778     devices.push_back(g_deviceB->GetDeviceId());
779     devices.push_back(invalidDev);
780 
781     std::map<std::string, DBStatus> result;
782     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
783     ASSERT_TRUE(status == OK);
784 
785     ASSERT_EQ(result.size(), devices.size());
786     EXPECT_EQ(result[DEVICE_A], INVALID_ARGS);
787     EXPECT_EQ(result[invalidDev], INVALID_ARGS);
788     EXPECT_EQ(result[DEVICE_B], OK);
789 }
790 
791 /**
792  * @tc.name: Normal Sync 011
793  * @tc.desc: Test sync with translated id.
794  * @tc.type: FUNC
795  * @tc.require:
796  * @tc.author: zhangqiquan
797  */
798 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync011, TestSize.Level1)
799 {
800     DBStatus status = OK;
801     std::vector<std::string> devices;
802     devices.push_back(g_deviceB->GetDeviceId());
803 
804     /**
805      * @tc.steps: step1. deviceA put {k1, v1}
806      */
807     Key key = {'1'};
808     Value value = {'1'};
809     status = g_kvDelegatePtr->Put(key, value);
810     ASSERT_TRUE(status == OK);
811 
812     /**
813      * @tc.steps: step2. ori dev will be append test
814      * @tc.expected: step2. sync should return OK.
815      */
__anonb4c415b50302(const std::string &oriDevId, const StoreInfo &) 816     RuntimeConfig::SetTranslateToDeviceIdCallback([](const std::string &oriDevId, const StoreInfo &) {
817         std::string dev = oriDevId + "test";
818         LOGI("translate %s to %s", oriDevId.c_str(), dev.c_str());
819         return dev;
820     });
821     std::map<std::string, DBStatus> result;
822     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
823     ASSERT_TRUE(status == OK);
824     RuntimeConfig::SetTranslateToDeviceIdCallback(nullptr);
825 
826     /**
827      * @tc.expected: step2. onComplete should be called, Send watermark should not be zero
828      */
829     ASSERT_TRUE(result.size() == devices.size());
830     for (const auto &pair : result) {
831         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
832         EXPECT_TRUE(pair.second == OK);
833     }
834     WatermarkInfo info;
835     CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info, false);
836     info = {};
837     std::string checkDev = g_deviceB->GetDeviceId() + "test";
838     CheckWatermark(checkDev, g_kvDelegatePtr, info, false);
839 }
840 
841 /**
842  * @tc.name: Normal Sync 012
843  * @tc.desc: Test sync with max data.
844  * @tc.type: FUNC
845  * @tc.require:
846  * @tc.author: wangxiangdong
847  */
848 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync012, TestSize.Level1)
849 {
850     /**
851      * @tc.steps: step1. prepare env
852      */
853     DBStatus status = OK;
854     std::vector<std::string> devices;
855     devices.push_back(g_deviceB->GetDeviceId());
856     uint32_t maxValueSize = 64 * 1024 * 1024;
857     Value maxValue;
858     DistributedDBToolsUnitTest::GetRandomKeyValue(maxValue, maxValueSize); // 64M
859     PragmaData input = static_cast<PragmaData>(&maxValueSize);
860     status = g_kvDelegatePtr->Pragma(SET_MAX_VALUE_SIZE, input);
861     /**
862      * @tc.steps: step2. deviceA put {k1, v1}
863      */
864     Key key = {'1'};
865     status = g_kvDelegatePtr->Put(key, maxValue);
866     Key key2 = {'2'};
867     Value value2 = {'2'};
868     status = g_kvDelegatePtr->Put(key2, value2);
869     ASSERT_TRUE(status == OK);
870 
871     /**
872      * @tc.steps: step3. ori dev will be append test
873      * @tc.expected: step3. sync should return OK.
874      */
__anonb4c415b50402(const std::string &oriDevId, const StoreInfo &) 875     RuntimeConfig::SetTranslateToDeviceIdCallback([](const std::string &oriDevId, const StoreInfo &) {
876         std::string dev = oriDevId + "test";
877         LOGI("translate %s to %s", oriDevId.c_str(), dev.c_str());
878         return dev;
879     });
880     std::map<std::string, DBStatus> result;
881     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
882     ASSERT_TRUE(status == OK);
883     RuntimeConfig::SetTranslateToDeviceIdCallback(nullptr);
884 
885     /**
886      * @tc.expected: step4. onComplete should be called, deviceB should get same data
887      */
888     ASSERT_TRUE(result.size() == devices.size());
889     for (const auto &pair : result) {
890         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
891         EXPECT_TRUE(pair.second == OK);
892     }
893     VirtualDataItem item;
894     g_deviceB->GetData(key, item);
895     EXPECT_TRUE(item.value == maxValue);
896     VirtualDataItem item2;
897     g_deviceB->GetData(key2, item2);
898     EXPECT_TRUE(item2.value == value2);
899 }
900 
901 /**
902  * @tc.name: Limit Data Sync 001
903  * @tc.desc: Test sync limit key and value data
904  * @tc.type: FUNC
905  * @tc.require:
906  * @tc.author: xushaohua
907  */
908 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, LimitDataSync001, TestSize.Level1)
909 {
910     DBStatus status = OK;
911     std::vector<std::string> devices;
912     devices.push_back(g_deviceB->GetDeviceId());
913 
914     Key key1;
915     Value value1;
916     DistributedDBToolsUnitTest::GetRandomKeyValue(key1, DBConstant::MAX_KEY_SIZE + 1);
917     DistributedDBToolsUnitTest::GetRandomKeyValue(value1, DBConstant::MAX_VALUE_SIZE + 1);
918 
919     Key key2;
920     Value value2;
921     DistributedDBToolsUnitTest::GetRandomKeyValue(key2, DBConstant::MAX_KEY_SIZE);
922     DistributedDBToolsUnitTest::GetRandomKeyValue(value2, DBConstant::MAX_VALUE_SIZE);
923 
924     /**
925      * @tc.steps: step1. deviceB put {k1, v1}, K1 > 1k, v1 > 4M
926      */
927     g_deviceB->PutData(key1, value1, 0, 0);
928 
929     /**
930      * @tc.steps: step2. deviceB put {k2, v2}, K2 = 1k, v2 = 4M
931      */
932     g_deviceC->PutData(key2, value2, 0, 0);
933 
934     /**
935      * @tc.steps: step3. deviceA call pull sync from device B
936      * @tc.expected: step3. sync should return OK.
937      */
938     std::map<std::string, DBStatus> result;
939     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
940     ASSERT_TRUE(status == OK);
941 
942     /**
943      * @tc.expected: step3. onComplete should be called.
944      */
945     ASSERT_TRUE(result.size() == devices.size());
946     for (const auto &pair : result) {
947         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
948         if (pair.first == g_deviceB->GetDeviceId()) {
949             EXPECT_TRUE(pair.second != OK);
950         } else {
951             EXPECT_TRUE(pair.second == OK);
952         }
953     }
954 
955     /**
956      * @tc.steps: step4. deviceA call pull sync from deviceC
957      * @tc.expected: step4. sync should return OK.
958      */
959     devices.clear();
960     result.clear();
961     devices.push_back(g_deviceC->GetDeviceId());
962     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
963     ASSERT_TRUE(status == OK);
964 
965     /**
966      * @tc.expected: step4. onComplete should be called, DeviceA have {k2. v2}, don't have {k1, v1}
967      */
968     ASSERT_TRUE(result.size() == devices.size());
969     for (const auto &pair : result) {
970         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
971         EXPECT_TRUE(pair.second == OK);
972     }
973 
974     // Get value from A
975     Value valueRead;
976     EXPECT_TRUE(g_kvDelegatePtr->Get(key1, valueRead) != OK);
977     valueRead.clear();
978     EXPECT_EQ(g_kvDelegatePtr->Get(key2, valueRead), OK);
979     EXPECT_TRUE(valueRead == value2);
980 }
981 
982 /**
983  * @tc.name: Limit Data Sync 002
984  * @tc.desc: Test PutBatch with invalid entries and then call sync.
985  * @tc.type: FUNC
986  * @tc.require:
987  * @tc.author: mazhao
988  */
989 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, LimitDataSync002, TestSize.Level1)
990 {
991     DBStatus status = OK;
992     std::vector<std::string> devices;
993     devices.push_back(g_deviceB->GetDeviceId());
994     devices.push_back(g_deviceC->GetDeviceId());
995     Key legalKey;
996     DistributedDBToolsUnitTest::GetRandomKeyValue(legalKey, DBConstant::MAX_KEY_SIZE); // 1K
997     Value legalValue;
998     DistributedDBToolsUnitTest::GetRandomKeyValue(legalValue, DBConstant::MAX_VALUE_SIZE); // 4M
999     Value emptyValue; // 0k
1000     vector<Entry> illegalEntrys; // size is 512M + 1KB
1001     for (int i = 0; i < 127; i++) { // 127 * (legalValue + legalKey) is equal to 508M + 127KB < 512M.
1002         illegalEntrys.push_back({legalKey, legalValue});
1003     }
1004     for (int i = 0; i < 3970; i++) { // 3970 * legalKey is equal to 3970KB.
1005         illegalEntrys.push_back({legalKey, emptyValue});
1006     }
1007     /**
1008      * @tc.steps: step1. PutBatch with invalid entries inside which total length of the key and valud is more than 512M
1009      * @tc.expected: step1. PutBatch should return INVALID_ARGS.
1010      */
1011     EXPECT_EQ(g_kvDelegatePtr->PutBatch(illegalEntrys), INVALID_ARGS);
1012     /**
1013      * @tc.steps: step2. deviceA call push_pull sync
1014      * @tc.expected: step2. sync return OK and all statuses is OK.
1015      */
1016     std::map<std::string, DBStatus> result;
1017     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
1018     ASSERT_TRUE(status == OK);
1019     ASSERT_TRUE(result.size() == devices.size());
1020     for (const auto &pair : result) {
1021         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1022         EXPECT_TRUE(pair.second == OK);
1023     }
1024 }
1025 
1026 /**
1027  * @tc.name: Auto Sync 001
1028  * @tc.desc: Verify auto sync enable function.
1029  * @tc.type: FUNC
1030  * @tc.require:
1031  * @tc.author: xushaohua
1032  */
1033 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, AutoSync001, TestSize.Level1)
1034 {
1035     std::vector<std::string> devices;
1036     devices.push_back(g_deviceB->GetDeviceId());
1037     devices.push_back(g_deviceC->GetDeviceId());
1038 
1039     /**
1040      * @tc.steps: step1. enable auto sync
1041      * @tc.expected: step1, Pragma return OK.
1042      */
1043     bool autoSync = true;
1044     PragmaData data = static_cast<PragmaData>(&autoSync);
1045     DBStatus status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1046     ASSERT_EQ(status, OK);
1047 
1048     /**
1049      * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1050      */
1051     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1052     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_2, VALUE_2) == OK);
1053 
1054     /**
1055      * @tc.steps: step3. sleep for data sync
1056      * @tc.expected: step3. deviceB,C has {k1, v1}, {k2, v2}
1057      */
1058     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1059     VirtualDataItem item;
1060     g_deviceB->GetData(KEY_1, item);
1061     EXPECT_EQ(item.value, VALUE_1);
1062     g_deviceB->GetData(KEY_2, item);
1063     EXPECT_EQ(item.value, VALUE_2);
1064     g_deviceC->GetData(KEY_1, item);
1065     EXPECT_EQ(item.value, VALUE_1);
1066     g_deviceC->GetData(KEY_2, item);
1067     EXPECT_EQ(item.value, VALUE_2);
1068 }
1069 
1070 /**
1071  * @tc.name: Auto Sync 002
1072  * @tc.desc: Verify auto sync disable function.
1073  * @tc.type: FUNC
1074  * @tc.require:
1075  * @tc.author: xushaohua
1076  */
1077 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, AutoSync002, TestSize.Level1)
1078 {
1079     std::vector<std::string> devices;
1080     devices.push_back(g_deviceB->GetDeviceId());
1081     devices.push_back(g_deviceC->GetDeviceId());
1082 
1083     /**
1084      * @tc.steps: step1. disable auto sync
1085      * @tc.expected: step1, Pragma return OK.
1086      */
1087     bool autoSync = false;
1088     PragmaData data = static_cast<PragmaData>(&autoSync);
1089     DBStatus status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1090     ASSERT_EQ(status, OK);
1091 
1092     /**
1093      * @tc.steps: step2. deviceB put {k1, v1}, deviceC put {k2, v2}
1094      */
1095     g_deviceB->PutData(KEY_1, VALUE_1, 0, 0);
1096     g_deviceC->PutData(KEY_2, VALUE_2, 0, 0);
1097 
1098     /**
1099      * @tc.steps: step3. sleep for data sync
1100      * @tc.expected: step3. deviceA don't have k1, k2.
1101      */
1102     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1103     Value value3;
1104     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == NOT_FOUND);
1105     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == NOT_FOUND);
1106 }
1107 
1108 /**
1109  * @tc.name: Block Sync 001
1110  * @tc.desc: Verify block push sync function.
1111  * @tc.type: FUNC
1112  * @tc.require:
1113  * @tc.author: xushaohua
1114  */
1115 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync001, TestSize.Level1)
1116 {
1117     std::vector<std::string> devices;
1118     devices.push_back(g_deviceB->GetDeviceId());
1119     devices.push_back(g_deviceC->GetDeviceId());
1120 
1121     /**
1122      * @tc.steps: step1. deviceA put {k1, v1}
1123      */
1124     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1125 
1126     /**
1127      * @tc.steps: step2. deviceA call block push sync to deviceB & deviceC.
1128      * @tc.expected: step2. Sync return OK, devices status OK, deviceB & deivceC has {k1, v1}.
1129      */
1130     std::map<std::string, DBStatus> result;
1131     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1132     ASSERT_EQ(status, OK);
1133     ASSERT_TRUE(result.size() == devices.size());
1134     for (const auto &pair : result) {
1135         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1136         EXPECT_TRUE(pair.second == OK);
1137     }
1138     VirtualDataItem item1;
1139     EXPECT_EQ(g_deviceB->GetData(KEY_1, item1), E_OK);
1140     EXPECT_EQ(item1.value, VALUE_1);
1141     VirtualDataItem item2;
1142     EXPECT_EQ(g_deviceC->GetData(KEY_1, item2), E_OK);
1143     EXPECT_EQ(item2.value, VALUE_1);
1144 }
1145 
1146 /**
1147  * @tc.name:  Block Sync 002
1148  * @tc.desc: Verify block pull sync function.
1149  * @tc.type: FUNC
1150  * @tc.require:
1151  * @tc.author: xushaohua
1152  */
1153 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync002, TestSize.Level1)
1154 {
1155     std::vector<std::string> devices;
1156     devices.push_back(g_deviceB->GetDeviceId());
1157     devices.push_back(g_deviceC->GetDeviceId());
1158 
1159     /**
1160      * @tc.steps: step1. deviceB put {k1, v1}, deviceC put {k2, v2}
1161      */
1162     g_deviceB->PutData(KEY_1, VALUE_1, 0, 0);
1163     g_deviceC->PutData(KEY_2, VALUE_2, 0, 0);
1164 
1165     /**
1166      * @tc.steps: step2. deviceA call block pull and pull sync to deviceB & deviceC.
1167      * @tc.expected: step2. Sync return OK, devices status OK, deviceA has {k1, v1}, {k2, v2}
1168      */
1169     std::map<std::string, DBStatus> result;
1170     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1171     ASSERT_EQ(status, OK);
1172     ASSERT_TRUE(result.size() == devices.size());
1173     for (const auto &pair : result) {
1174         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1175         EXPECT_TRUE(pair.second == OK);
1176     }
1177     Value value3;
1178     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == OK);
1179     EXPECT_TRUE(value3 == VALUE_1);
1180     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == OK);
1181     EXPECT_TRUE(value3 == VALUE_2);
1182 }
1183 
1184 /**
1185  * @tc.name:  Block Sync 003
1186  * @tc.desc: Verify block push and pull sync function.
1187  * @tc.type: FUNC
1188  * @tc.require:
1189  * @tc.author: xushaohua
1190  */
1191 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync003, TestSize.Level1)
1192 {
1193     std::vector<std::string> devices;
1194     devices.push_back(g_deviceB->GetDeviceId());
1195     devices.push_back(g_deviceC->GetDeviceId());
1196 
1197     /**
1198      * @tc.steps: step1. deviceA put {k1, v1}
1199      */
1200     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1201 
1202     /**
1203      * @tc.steps: step2. deviceB put {k1, v1}, deviceB put {k2, v2}
1204      */
1205     g_deviceB->PutData(KEY_2, VALUE_2, 0, 0);
1206     g_deviceC->PutData(KEY_3, VALUE_3, 0, 0);
1207 
1208     /**
1209      * @tc.steps: step3. deviceA call block pull and pull sync to deviceB & deviceC.
1210      * @tc.expected: step3. Sync return OK, devices status OK, deviceA has {k1, v1}, {k2, v2} {k3, v3}
1211      *      deviceB has {k1, v1}, {k2. v2} , deviceC has {k1, v1}, {k3, v3}
1212      */
1213     std::map<std::string, DBStatus> result;
1214     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, true);
1215     ASSERT_EQ(status, OK);
1216     ASSERT_TRUE(result.size() == devices.size());
1217     for (const auto &pair : result) {
1218         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1219         EXPECT_TRUE(pair.second == OK);
1220     }
1221 
1222     VirtualDataItem item1;
1223     g_deviceB->GetData(KEY_1, item1);
1224     EXPECT_TRUE(item1.value == VALUE_1);
1225     g_deviceB->GetData(KEY_2, item1);
1226     EXPECT_TRUE(item1.value == VALUE_2);
1227 
1228     VirtualDataItem item2;
1229     g_deviceC->GetData(KEY_1, item2);
1230     EXPECT_TRUE(item2.value == VALUE_1);
1231     g_deviceC->GetData(KEY_3, item2);
1232     EXPECT_TRUE(item2.value == VALUE_3);
1233 
1234     Value value3;
1235     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == OK);
1236     EXPECT_TRUE(value3 == VALUE_1);
1237     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == OK);
1238     EXPECT_TRUE(value3 == VALUE_2);
1239     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_3, value3) == OK);
1240     EXPECT_TRUE(value3 == VALUE_3);
1241 }
1242 
1243 /**
1244  * @tc.name:  Block Sync 004
1245  * @tc.desc: Verify block sync function invalid args.
1246  * @tc.type: FUNC
1247  * @tc.require:
1248  * @tc.author: xushaohua
1249  */
1250 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync004, TestSize.Level2)
1251 {
1252     std::vector<std::string> devices;
1253 
1254     /**
1255      * @tc.steps: step1. deviceA put {k1, v1}
1256      */
1257     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1258 
1259     /**
1260      * @tc.steps: step2. deviceA call block push sync to deviceB & deviceC.
1261      * @tc.expected: step2. Sync return INVALID_ARGS
1262      */
1263     std::map<std::string, DBStatus> result;
1264     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1265     EXPECT_EQ(status, INVALID_ARGS);
1266 
1267     /**
1268      * @tc.steps: step3. deviceB, deviceC offlinem and push deviceA sync to deviceB and deviceC.
1269      * @tc.expected: step3. Sync return OK, but the deviceB and deviceC are TIME_OUT
1270      */
1271     devices.push_back(g_deviceB->GetDeviceId());
1272     devices.push_back(g_deviceC->GetDeviceId());
1273     g_deviceB->Offline();
1274     g_deviceC->Offline();
1275 
1276     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1277     EXPECT_EQ(status, OK);
1278     ASSERT_TRUE(result.size() == devices.size());
1279     for (const auto &pair : result) {
1280         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1281         // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
1282         // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
1283         // The returned status is COMM_FAILURE
1284         EXPECT_TRUE((pair.second == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
1285             (pair.second == COMM_FAILURE));
1286     }
1287 }
1288 
1289 /**
1290  * @tc.name:  Block Sync 005
1291  * @tc.desc: Verify block sync function busy.
1292  * @tc.type: FUNC
1293  * @tc.require:
1294  * @tc.author: xushaohua
1295  */
1296 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync005, TestSize.Level2)
1297 {
1298     std::vector<std::string> devices;
1299     devices.push_back(g_deviceB->GetDeviceId());
1300     devices.push_back(g_deviceC->GetDeviceId());
1301     /**
1302      * @tc.steps: step1. deviceA put {k1, v1}
1303      */
1304     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1305 
1306     /**
1307      * @tc.steps: step2. New a thread to deviceA call block push sync to deviceB & deviceC,
1308      *      but deviceB & C is blocked
1309      * @tc.expected: step2. Sync will be blocked util timeout, and then return OK
1310      */
1311     g_deviceB->Offline();
1312     g_deviceC->Offline();
__anonb4c415b50502() 1313     thread thread([devices]() {
1314         std::map<std::string, DBStatus> resultInner;
1315         DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1316         EXPECT_EQ(status, OK);
1317     });
1318     thread.detach();
1319     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1320     /**
1321      * @tc.steps: step3. sleep 1s and call sync.
1322      * @tc.expected: step3. Sync will return BUSY.
1323      */
1324     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1325     std::map<std::string, DBStatus> result;
1326     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, true);
1327     EXPECT_EQ(status, OK);
1328 }
1329 
1330 /**
1331   * @tc.name: SyncQueue001
1332   * @tc.desc: Invalid args check of Pragma GET_QUEUED_SYNC_SIZE SET_QUEUED_SYNC_LIMIT and
1333   * GET_QUEUED_SYNC_LIMIT, expect return INVALID_ARGS.
1334   * @tc.type: FUNC
1335   * @tc.require:
1336   * @tc.author: wangchuanqing
1337   */
1338 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue001, TestSize.Level3)
1339 {
1340     /**
1341      * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE, and set param to be null
1342      * @tc.expected: step1. Expect return INVALID_ARGS.
1343      */
1344     int *param = nullptr;
1345     PragmaData input = static_cast<PragmaData>(param);
1346     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), INVALID_ARGS);
1347 
1348     /**
1349      * @tc.steps:step2. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be null
1350      * @tc.expected: step2. Expect return INVALID_ARGS.
1351      */
1352     input = static_cast<PragmaData>(param);
1353     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1354 
1355     /**
1356      * @tc.steps:step3. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT, and set param to be null
1357      * @tc.expected: step3. Expect return INVALID_ARGS.
1358      */
1359     input = static_cast<PragmaData>(param);
1360     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1361 
1362     /**
1363      * @tc.steps:step4. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be QUEUED_SYNC_LIMIT_MIN - 1
1364      * @tc.expected: step4. Expect return INVALID_ARGS.
1365      */
1366     int limit = DBConstant::QUEUED_SYNC_LIMIT_MIN - 1;
1367     input = static_cast<PragmaData>(&limit);
1368     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1369 
1370     /**
1371      * @tc.steps:step5. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be QUEUED_SYNC_LIMIT_MAX + 1
1372      * @tc.expected: step5. Expect return INVALID_ARGS.
1373      */
1374     limit = DBConstant::QUEUED_SYNC_LIMIT_MAX + 1;
1375     input = static_cast<PragmaData>(&limit);
1376     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1377 }
1378 
1379 /**
1380   * @tc.name: SyncQueue002
1381   * @tc.desc: Pragma GET_QUEUED_SYNC_LIMIT and SET_QUEUED_SYNC_LIMIT
1382   * @tc.type: FUNC
1383   * @tc.require:
1384   * @tc.author: wangchuanqing
1385   */
1386 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue002, TestSize.Level3)
1387 {
1388     /**
1389      * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT,
1390      * @tc.expected: step1. Expect return OK, limit eq QUEUED_SYNC_LIMIT_DEFAULT.
1391      */
1392     int limit = 0;
1393     PragmaData input = static_cast<PragmaData>(&limit);
1394     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), OK);
1395     EXPECT_EQ(limit, DBConstant::QUEUED_SYNC_LIMIT_DEFAULT);
1396 
1397     /**
1398      * @tc.steps:step2. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be 50
1399      * @tc.expected: step2. Expect return OK.
1400      */
1401     limit = 50;
1402     input = static_cast<PragmaData>(&limit);
1403     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), OK);
1404 
1405     /**
1406      * @tc.steps:step3. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT,
1407      * @tc.expected: step3. Expect return OK, limit eq 50
1408      */
1409     limit = 0;
1410     input = static_cast<PragmaData>(&limit);
1411     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), OK);
1412     EXPECT_EQ(limit, 50);
1413 }
1414 
1415 /**
1416   * @tc.name: SyncQueue003
1417   * @tc.desc: sync queue test
1418   * @tc.type: FUNC
1419   * @tc.require:
1420   * @tc.author: wangchuanqing
1421   */
1422 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue003, TestSize.Level3)
1423 {
1424     DBStatus status = OK;
1425     std::vector<std::string> devices;
1426     devices.push_back(g_deviceB->GetDeviceId());
1427     devices.push_back(g_deviceC->GetDeviceId());
1428 
1429     /**
1430      * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1431      * @tc.expected: step1. Expect return OK, size eq 0.
1432      */
1433     int size;
1434     PragmaData input = static_cast<PragmaData>(&size);
1435     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1436     EXPECT_EQ(size, 0);
1437 
1438     /**
1439      * @tc.steps:step2. deviceA put {k1, v1}
1440      */
1441     status = g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1442     ASSERT_TRUE(status == OK);
1443 
1444     /**
1445      * @tc.steps:step3. deviceA sync SYNC_MODE_PUSH_ONLY
1446      */
1447     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1448     ASSERT_TRUE(status == OK);
1449 
1450     /**
1451      * @tc.steps:step4. deviceA put {k2, v2}
1452      */
1453     status = g_kvDelegatePtr->Put(KEY_2, VALUE_2);
1454     ASSERT_TRUE(status == OK);
1455 
1456     /**
1457      * @tc.steps:step5. deviceA sync SYNC_MODE_PUSH_ONLY
1458      */
1459     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1460     ASSERT_TRUE(status == OK);
1461 
1462     /**
1463      * @tc.steps:step6. deviceB put {k3, v3}
1464      */
1465     g_deviceB->PutData(KEY_3, VALUE_3, 0, 0);
1466 
1467     /**
1468      * @tc.steps:step7. deviceA put {k4, v4}
1469      */
1470     status = g_kvDelegatePtr->Put(KEY_4, VALUE_4);
1471     ASSERT_TRUE(status == OK);
1472 
1473     /**
1474      * @tc.steps:step8. deviceA sync SYNC_MODE_PUSH_PULL
1475      */
1476     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_PULL, nullptr, false);
1477     ASSERT_TRUE(status == OK);
1478 
1479     /**
1480      * @tc.steps:step9. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1481      * @tc.expected: step1. Expect return OK, 0 <= size <= 4
1482      */
1483     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1484     ASSERT_TRUE((size >= 0) && (size <= 4));
1485 
1486     /**
1487      * @tc.steps:step10. deviceB put {k5, v5}
1488      */
1489     g_deviceB->PutData(KEY_5, VALUE_5, 0, 0);
1490 
1491     /**
1492      * @tc.steps:step11. deviceA call sync and wait
1493      * @tc.expected: step11. sync should return OK.
1494      */
1495     std::map<std::string, DBStatus> result;
1496     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
1497     ASSERT_TRUE(status == OK);
1498 
1499     /**
1500      * @tc.expected: step11. onComplete should be called, DeviceA,B,C have {k1,v1}~ {KEY_5,VALUE_5}
1501      */
1502     ASSERT_TRUE(result.size() == devices.size());
1503     for (const auto &pair : result) {
1504         EXPECT_TRUE(pair.second == OK);
1505     }
1506     VirtualDataItem item;
1507     g_deviceB->GetData(KEY_1, item);
1508     EXPECT_TRUE(item.value == VALUE_1);
1509     g_deviceB->GetData(KEY_2, item);
1510     EXPECT_TRUE(item.value == VALUE_2);
1511     g_deviceB->GetData(KEY_3, item);
1512     EXPECT_TRUE(item.value == VALUE_3);
1513     g_deviceB->GetData(KEY_4, item);
1514     EXPECT_TRUE(item.value == VALUE_4);
1515     g_deviceB->GetData(KEY_5, item);
1516     EXPECT_TRUE(item.value == VALUE_5);
1517     Value value;
1518     EXPECT_EQ(g_kvDelegatePtr->Get(KEY_3, value), OK);
1519     EXPECT_EQ(VALUE_3, value);
1520     EXPECT_EQ(g_kvDelegatePtr->Get(KEY_5, value), OK);
1521     EXPECT_EQ(VALUE_5, value);
1522 }
1523 
1524 /**
1525   * @tc.name: SyncQueue004
1526   * @tc.desc: sync queue full test
1527   * @tc.type: FUNC
1528   * @tc.require:
1529   * @tc.author: wangchuanqing
1530   */
1531 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue004, TestSize.Level3)
1532 {
1533     DBStatus status = OK;
1534     std::vector<std::string> devices;
1535     devices.push_back(g_deviceB->GetDeviceId());
1536     devices.push_back(g_deviceC->GetDeviceId());
1537 
1538     /**
1539      * @tc.steps:step1. deviceB C block
1540      */
1541     g_communicatorAggregator->SetBlockValue(true);
1542 
1543     /**
1544      * @tc.steps:step2. deviceA put {k1, v1}
1545      */
1546     status = g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1547     ASSERT_TRUE(status == OK);
1548 
1549     /**
1550      * @tc.steps:step3. deviceA sync QUEUED_SYNC_LIMIT_DEFAULT times
1551      * @tc.expected: step3. Expect return OK
1552      */
1553     for (int i = 0; i < DBConstant::QUEUED_SYNC_LIMIT_DEFAULT; i++) {
1554         status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1555         ASSERT_TRUE(status == OK);
1556     }
1557 
1558     /**
1559      * @tc.steps:step4. deviceA sync
1560      * @tc.expected: step4. Expect return BUSY
1561      */
1562     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1563     ASSERT_TRUE(status == BUSY);
1564     g_communicatorAggregator->SetBlockValue(false);
1565 }
1566 
1567 /**
1568   * @tc.name: SyncQueue005
1569   * @tc.desc: block sync queue test
1570   * @tc.type: FUNC
1571   * @tc.require:
1572   * @tc.author: wangchuanqing
1573   */
1574 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue005, TestSize.Level3)
1575 {
1576     std::vector<std::string> devices;
1577     devices.push_back(g_deviceB->GetDeviceId());
1578     devices.push_back(g_deviceC->GetDeviceId());
1579     /**
1580      * @tc.steps:step1. New a thread to deviceA call block push sync to deviceB & deviceC,
1581      *      but deviceB & C is offline
1582      * @tc.expected: step1. Sync will be blocked util timeout, and then return OK
1583      */
1584     g_deviceB->Offline();
1585     g_deviceC->Offline();
1586     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1587 
1588     /**
1589      * @tc.steps:step2. deviceA put {k1, v1}
1590      */
1591     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1592 
1593     std::mutex lockMutex;
1594     std::condition_variable conditionVar;
1595 
__anonb4c415b50602() 1596     std::thread threadFirst([devices]() {
1597         std::map<std::string, DBStatus> resultInner;
1598         DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1599         EXPECT_EQ(status, OK);
1600     });
1601     threadFirst.detach();
1602     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1603     /**
1604      * @tc.steps:step3. New a thread to deviceA call block push sync to deviceB & deviceC,
1605      *      but deviceB & C is offline
1606      * @tc.expected: step2. Sync will be blocked util timeout, and then return OK
1607      */
__anonb4c415b50702() 1608     std::thread threadSecond([devices, &lockMutex, &conditionVar]() {
1609         std::map<std::string, DBStatus> resultInner;
1610         DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1611         EXPECT_EQ(status, OK);
1612         std::unique_lock<mutex> lockInner(lockMutex);
1613         conditionVar.notify_one();
1614     });
1615     threadSecond.detach();
1616 
1617     /**
1618      * @tc.steps:step4. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1619      * @tc.expected: step1. Expect return OK, size eq 0.
1620      */
1621     int size;
1622     PragmaData input = static_cast<PragmaData>(&size);
1623     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1624     EXPECT_EQ(size, 0);
1625 
1626     /**
1627      * @tc.steps:step5. wait exit
1628      */
1629     std::unique_lock<mutex> lock(lockMutex);
1630     auto now = std::chrono::system_clock::now();
1631     conditionVar.wait_until(lock, now + 2 * INT8_MAX * 1000ms);
1632 }
1633 
1634 /**
1635   * @tc.name: CalculateSyncData001
1636   * @tc.desc: Test sync data whose device never synced before
1637   * @tc.type: FUNC
1638   * @tc.require:
1639   * @tc.author: zhuwentao
1640   */
1641 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData001, TestSize.Level3)
1642 {
1643     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1644     size_t dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
1645     uint32_t serialHeadLen = 8u;
1646     EXPECT_EQ(static_cast<uint32_t>(dataSize), 0u + serialHeadLen);
1647     uint32_t keySize = 256u;
1648     uint32_t valuesize = 1024u;
1649     uint32_t itemCount = 10u;
1650     CalculateDataTest(itemCount, keySize, valuesize);
1651 }
1652 
1653 /**
1654   * @tc.name: CalculateSyncData002
1655   * @tc.desc: Test sync data whose device synced before, but sync data is less than 1M
1656   * @tc.type: FUNC
1657   * @tc.require:
1658   * @tc.author: zhuwentao
1659   */
1660 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData002, TestSize.Level3)
1661 {
1662     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1663     Key key1 = {'1'};
1664     Value value1 = {'1'};
1665     EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
1666 
1667     std::vector<std::string> devices;
1668     devices.push_back(g_deviceB->GetDeviceId());
1669     std::map<std::string, DBStatus> result;
1670     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1671     ASSERT_TRUE(status == OK);
1672     ASSERT_TRUE(result.size() == devices.size());
1673     for (const auto &pair : result) {
1674         EXPECT_TRUE(pair.second == OK);
1675     }
1676 
1677     uint32_t keySize = 256u;
1678     uint32_t valuesize = 512u;
1679     uint32_t itemCount = 20u;
1680     CalculateDataTest(itemCount, keySize, valuesize);
1681 }
1682 
1683 /**
1684   * @tc.name: CalculateSyncData003
1685   * @tc.desc: Test sync data whose device synced before, but sync data is larger than 1M
1686   * @tc.type: FUNC
1687   * @tc.require:
1688   * @tc.author: zhuwentao
1689   */
1690 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData003, TestSize.Level3)
1691 {
1692     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1693     Key key1 = {'1'};
1694     Value value1 = {'1'};
1695     EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
1696 
1697     std::vector<std::string> devices;
1698     devices.push_back(g_deviceB->GetDeviceId());
1699     std::map<std::string, DBStatus> result;
1700     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1701     ASSERT_TRUE(status == OK);
1702     ASSERT_TRUE(result.size() == devices.size());
1703     for (const auto &pair : result) {
1704         EXPECT_TRUE(pair.second == OK);
1705     }
1706     uint32_t keySize = 256u;
1707     uint32_t valuesize = 1024u;
1708     uint32_t itemCount = 2048u;
1709     CalculateDataTest(itemCount, keySize, valuesize);
1710 }
1711 
1712 /**
1713   * @tc.name: CalculateSyncData004
1714   * @tc.desc: Test invalid device when call GetSyncDataSize interface
1715   * @tc.type: FUNC
1716   * @tc.require:
1717   * @tc.author: zhuwentao
1718   */
1719 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData004, TestSize.Level3)
1720 {
1721     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1722     std::string device;
1723     EXPECT_EQ(g_kvDelegatePtr->GetSyncDataSize(device), 0u);
1724 }
1725 
1726 /**
1727   * @tc.name: CalculateSyncData005
1728   * @tc.desc: Test CalculateSyncData and rekey Concurrently
1729   * @tc.type: FUNC
1730   * @tc.require:
1731   * @tc.author: zhuwentao
1732   */
1733 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData005, TestSize.Level3)
1734 {
1735     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1736     size_t dataSize = 0;
1737     Key key1 = {'1'};
1738     Value value1 = {'1'};
1739     EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
__anonb4c415b50802() 1740     std::thread thread1([]() {
1741         std::this_thread::sleep_for(std::chrono::milliseconds(1));
1742         CipherPassword passwd; // random password
1743         vector<uint8_t> passwdBuffer(10, 45);  // 10 and 45 as random password.
1744         passwd.SetValue(passwdBuffer.data(), passwdBuffer.size());
1745         g_kvDelegatePtr->Rekey(passwd);
1746     });
__anonb4c415b50902() 1747     std::thread thread2([&dataSize, &key1, &value1]() {
1748         dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
1749         if (dataSize > 0) {
1750             uint32_t expectedDataSize = (key1.size() + value1.size());
1751             uint32_t externalSize = 70u;
1752             uint32_t serialHeadLen = 8u;
1753             ASSERT_GE(static_cast<uint32_t>(dataSize), expectedDataSize);
1754             ASSERT_LE(static_cast<uint32_t>(dataSize), serialHeadLen + expectedDataSize + externalSize);
1755         }
1756     });
1757     thread1.join();
1758     thread2.join();
1759 }
1760 
1761 /**
1762  * @tc.name: GetWaterMarkInfo001
1763  * @tc.desc: Test invalid dev for get water mark info.
1764  * @tc.type: FUNC
1765  * @tc.require:
1766  * @tc.author: zhangqiquan
1767  */
1768 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, GetWaterMarkInfo001, TestSize.Level0)
1769 {
1770     std::string dev;
1771     auto res = g_kvDelegatePtr->GetWatermarkInfo(dev);
1772     EXPECT_EQ(res.first, INVALID_ARGS);
1773     EXPECT_EQ(res.second.sendMark, 0u);
1774     EXPECT_EQ(res.second.receiveMark, 0u);
1775 
1776     dev = std::string(DBConstant::MAX_DEV_LENGTH + 1, 'a');
1777     res = g_kvDelegatePtr->GetWatermarkInfo(dev);
1778     EXPECT_EQ(res.first, INVALID_ARGS);
1779     EXPECT_EQ(res.second.sendMark, 0u);
1780     EXPECT_EQ(res.second.receiveMark, 0u);
1781 }
1782 
1783 /**
1784  * @tc.name: QuerySyncRetry001
1785  * @tc.desc: use sync retry sync use query pull by compress
1786  * @tc.type: FUNC
1787  * @tc.require:
1788  * @tc.author: zhangqiquan
1789  */
1790 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, QuerySyncRetry001, TestSize.Level3)
1791 {
1792     if (g_kvDelegatePtr != nullptr) {
1793         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1794         g_kvDelegatePtr = nullptr;
1795     }
1796     /**
1797      * @tc.steps: step1. open db use Compress
1798      * @tc.expected: step1, Pragma return OK.
1799      */
1800     KvStoreNbDelegate::Option option;
1801     option.isNeedCompressOnSync = true;
1802     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1803     ASSERT_TRUE(g_kvDelegateStatus == OK);
1804     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1805 
1806     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, QUERY_SYNC_MESSAGE);
1807     std::vector<std::string> devices;
1808     devices.push_back(g_deviceB->GetDeviceId());
1809 
1810     /**
1811      * @tc.steps: step2. set sync retry
1812      * @tc.expected: step2, Pragma return OK.
1813      */
1814     int pragmaData = 1;
1815     PragmaData input = static_cast<PragmaData>(&pragmaData);
1816     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1817 
1818     /**
1819      * @tc.steps: step3. deviceA call sync and wait
1820      * @tc.expected: step3. sync should return OK.
1821      */
1822     std::map<std::string, DBStatus> result;
1823     auto query = Query::Select().PrefixKey({});
1824     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query) == OK);
1825 
1826     /**
1827      * @tc.expected: step4. onComplete should be called, and status is time_out
1828      */
1829     ASSERT_TRUE(result.size() == devices.size());
1830     for (const auto &pair : result) {
1831         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1832         EXPECT_EQ(pair.second, OK);
1833     }
1834     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1835 }
1836 }