• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <condition_variable>
17 #include <gtest/gtest.h>
18 #include <thread>
19 
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "platform_specific.h"
27 #include "runtime_config.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_kv_sync_task_context.h"
30 
31 using namespace testing::ext;
32 using namespace DistributedDB;
33 using namespace DistributedDBUnitTest;
34 using namespace std;
35 
36 namespace {
37     string g_testDir;
38     const string STORE_ID = "kv_stroe_sync_test";
39     const int64_t TIME_OFFSET = 5000000;
40     const int WAIT_TIME = 1000;
41     const std::string DEVICE_A = "real_device";
42     const std::string DEVICE_B = "deviceB";
43     const std::string DEVICE_C = "deviceC";
44 
45     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
46     KvStoreConfig g_config;
47     DistributedDBToolsUnitTest g_tool;
48     DBStatus g_kvDelegateStatus = INVALID_ARGS;
49     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
50     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
51     KvVirtualDevice *g_deviceB = nullptr;
52     KvVirtualDevice *g_deviceC = nullptr;
53 
54     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
55     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
56         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
57 
CalculateDataTest(uint32_t itemCount,uint32_t keySize,uint32_t valueSize)58     void CalculateDataTest(uint32_t itemCount, uint32_t keySize, uint32_t valueSize)
59     {
60         for (uint32_t i = 0; i < itemCount; i++) {
61             std::vector<uint8_t> prefixKey = {'a', 'b', 'c'};
62             Key key = DistributedDBToolsUnitTest::GetRandPrefixKey(prefixKey, keySize);
63             Value value;
64             DistributedDBToolsUnitTest::GetRandomKeyValue(value, valueSize);
65             EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
66         }
67         size_t dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
68         uint32_t expectedDataSize = (valueSize + keySize);
69         uint32_t externalSize = 70u;
70         uint32_t serialHeadLen = 8u;
71         LOGI("expectedDataSize=%u, v=%u", expectedDataSize, externalSize);
72         uint32_t maxDataSize = 1024u * 1024u;
73         if (itemCount * expectedDataSize >= maxDataSize) {
74             EXPECT_EQ(static_cast<uint32_t>(dataSize), maxDataSize);
75             return;
76         }
77         ASSERT_GE(static_cast<uint32_t>(dataSize), itemCount * expectedDataSize);
78         ASSERT_LE(static_cast<uint32_t>(dataSize), serialHeadLen + itemCount * (expectedDataSize + externalSize));
79     }
80 
81 class DistributedDBSingleVerP2PSimpleSyncTest : public testing::Test {
82 public:
83     static void SetUpTestCase(void);
84     static void TearDownTestCase(void);
85     void SetUp();
86     void TearDown();
87 };
88 
SetUpTestCase(void)89 void DistributedDBSingleVerP2PSimpleSyncTest::SetUpTestCase(void)
90 {
91     /**
92      * @tc.setup: Init datadir and Virtual Communicator.
93      */
94     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
95     g_config.dataDir = g_testDir;
96     g_mgr.SetKvStoreConfig(g_config);
97 
98     string dir = g_testDir + "/single_ver";
99     DIR* dirTmp = opendir(dir.c_str());
100     if (dirTmp == nullptr) {
101         OS::MakeDBDirectory(dir);
102     } else {
103         closedir(dirTmp);
104     }
105 
106     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
107     ASSERT_TRUE(g_communicatorAggregator != nullptr);
108     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
109 }
110 
TearDownTestCase(void)111 void DistributedDBSingleVerP2PSimpleSyncTest::TearDownTestCase(void)
112 {
113     /**
114      * @tc.teardown: Release virtual Communicator and clear data dir.
115      */
116     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
117         LOGE("rm test db files error!");
118     }
119     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
120 }
121 
SetUp(void)122 void DistributedDBSingleVerP2PSimpleSyncTest::SetUp(void)
123 {
124     DistributedDBToolsUnitTest::PrintTestCaseInfo();
125     /**
126      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
127      */
128     KvStoreNbDelegate::Option option;
129     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
130     ASSERT_TRUE(g_kvDelegateStatus == OK);
131     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
132     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
133     ASSERT_TRUE(g_deviceB != nullptr);
134     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
135     ASSERT_TRUE(syncInterfaceB != nullptr);
136     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
137 
138     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
139     ASSERT_TRUE(g_deviceC != nullptr);
140     VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
141     ASSERT_TRUE(syncInterfaceC != nullptr);
142     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
143 
144     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
145         const std::string &deviceId, uint8_t flag) -> bool {
146             return true;
147         };
148     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
149 }
150 
TearDown(void)151 void DistributedDBSingleVerP2PSimpleSyncTest::TearDown(void)
152 {
153     /**
154      * @tc.teardown: Release device A, B, C
155      */
156     if (g_kvDelegatePtr != nullptr) {
157         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
158         g_kvDelegatePtr = nullptr;
159         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
160         LOGD("delete kv store status %d", status);
161         ASSERT_TRUE(status == OK);
162     }
163     if (g_deviceB != nullptr) {
164         delete g_deviceB;
165         g_deviceB = nullptr;
166     }
167     if (g_deviceC != nullptr) {
168         delete g_deviceC;
169         g_deviceC = nullptr;
170     }
171     PermissionCheckCallbackV2 nullCallback;
172     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
173 }
174 
CheckWatermark(const std::string & dev,KvStoreNbDelegate * kvDelegatePtr,WatermarkInfo expectInfo,bool sendEqual=true,bool receiveEqual=true)175 void CheckWatermark(const std::string &dev, KvStoreNbDelegate *kvDelegatePtr, WatermarkInfo expectInfo,
176     bool sendEqual = true, bool receiveEqual = true)
177 {
178     auto [status, watermarkInfo] = kvDelegatePtr->GetWatermarkInfo(dev);
179     EXPECT_EQ(status, OK);
180     if (sendEqual) {
181         EXPECT_EQ(watermarkInfo.sendMark, expectInfo.sendMark);
182     } else {
183         EXPECT_NE(watermarkInfo.sendMark, expectInfo.sendMark);
184     }
185     if (receiveEqual) {
186         EXPECT_EQ(watermarkInfo.receiveMark, expectInfo.receiveMark);
187     } else {
188         EXPECT_NE(watermarkInfo.receiveMark, expectInfo.receiveMark);
189     }
190 }
191 
192 /**
193  * @tc.name: Normal Sync 001
194  * @tc.desc: Test normal push sync for add data.
195  * @tc.type: FUNC
196  * @tc.require: AR000CQS3S SR000CQE0B
197  * @tc.author: xushaohua
198  */
199 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync001, TestSize.Level1)
200 {
201     DBStatus status = OK;
202     std::vector<std::string> devices;
203     devices.push_back(g_deviceB->GetDeviceId());
204     devices.push_back(g_deviceC->GetDeviceId());
205 
206     /**
207      * @tc.steps: step1. deviceA put {k1, v1}
208      */
209     Key key = {'1'};
210     Value value = {'1'};
211 
212     WatermarkInfo info;
213     CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info);
214     CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info);
215     status = g_kvDelegatePtr->Put(key, value);
216     ASSERT_TRUE(status == OK);
217 
218     /**
219      * @tc.steps: step2. deviceA call sync and wait
220      * @tc.expected: step2. sync should return OK.
221      */
222     std::map<std::string, DBStatus> result;
223     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
224     ASSERT_TRUE(status == OK);
225     CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info, false);
226     CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info, false);
227 
228     /**
229      * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
230      */
231     ASSERT_TRUE(result.size() == devices.size());
232     for (const auto &pair : result) {
233         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
234         EXPECT_TRUE(pair.second == OK);
235     }
236     VirtualDataItem item;
237     g_deviceB->GetData(key, item);
238     EXPECT_TRUE(item.value == value);
239     g_deviceC->GetData(key, item);
240     EXPECT_TRUE(item.value == value);
241 }
242 
243 /**
244  * @tc.name: Normal Sync 002
245  * @tc.desc: Test normal push sync for update data.
246  * @tc.type: FUNC
247  * @tc.require: AR000CCPOM
248  * @tc.author: xushaohua
249  */
250 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync002, TestSize.Level1)
251 {
252     DBStatus status = OK;
253     std::vector<std::string> devices;
254     devices.push_back(g_deviceB->GetDeviceId());
255     devices.push_back(g_deviceC->GetDeviceId());
256 
257     /**
258      * @tc.steps: step1. deviceA put {k1, v1}
259      */
260     Key key = {'1'};
261     Value value = {'1'};
262     status = g_kvDelegatePtr->Put(key, value);
263     ASSERT_TRUE(status == OK);
264 
265     /**
266      * @tc.steps: step2. deviceA put {k1, v2}
267      */
268     Value value2;
269     value2.push_back('2');
270     status = g_kvDelegatePtr->Put(key, value2);
271     ASSERT_TRUE(status == OK);
272 
273     /**
274      * @tc.steps: step3. deviceA call sync and wait
275      * @tc.expected: step3. sync should return OK.
276      */
277     std::map<std::string, DBStatus> result;
278     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
279     ASSERT_TRUE(status == OK);
280 
281     /**
282      * @tc.expected: step3. onComplete should be called, DeviceB,C have {k1,v2}
283      */
284     ASSERT_TRUE(result.size() == devices.size());
285     for (const auto &pair : result) {
286         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
287         EXPECT_TRUE(pair.second == OK);
288     }
289     VirtualDataItem item;
290     g_deviceC->GetData(key, item);
291     EXPECT_TRUE(item.value == value2);
292     g_deviceB->GetData(key, item);
293     EXPECT_TRUE(item.value == value2);
294 }
295 
296 /**
297  * @tc.name: Normal Sync 003
298  * @tc.desc: Test normal push sync for delete data.
299  * @tc.type: FUNC
300  * @tc.require: AR000CQS3S
301  * @tc.author: xushaohua
302  */
303 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync003, TestSize.Level1)
304 {
305     DBStatus status = OK;
306     std::vector<std::string> devices;
307     devices.push_back(g_deviceB->GetDeviceId());
308     devices.push_back(g_deviceC->GetDeviceId());
309 
310     /**
311      * @tc.steps: step1. deviceA put {k1, v1}
312      */
313     Key key = {'1'};
314     Value value = {'1'};
315     status = g_kvDelegatePtr->Put(key, value);
316     ASSERT_TRUE(status == OK);
317 
318     /**
319      * @tc.steps: step2. deviceA delete k1
320      */
321     status = g_kvDelegatePtr->Delete(key);
322     ASSERT_TRUE(status == OK);
323     std::map<std::string, DBStatus> result;
324     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
325     ASSERT_TRUE(status == OK);
326 
327     /**
328      * @tc.steps: step3. deviceA call sync and wait
329      * @tc.expected: step3. sync should return OK.
330      */
331     ASSERT_TRUE(result.size() == devices.size());
332     for (const auto &pair : result) {
333         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
334         EXPECT_TRUE(pair.second == OK);
335     }
336 
337     /**
338      * @tc.expected: step3. onComplete should be called, DeviceB,C have {k1, delete}
339      */
340     VirtualDataItem item;
341     Key hashKey;
342     DistributedDBToolsUnitTest::CalcHash(key, hashKey);
343     EXPECT_EQ(g_deviceB->GetData(hashKey, item), -E_NOT_FOUND);
344     EXPECT_EQ(g_deviceC->GetData(hashKey, item), -E_NOT_FOUND);
345 }
346 
347 /**
348  * @tc.name: Normal Sync 004
349  * @tc.desc: Test normal pull sync for add data.
350  * @tc.type: FUNC
351  * @tc.require: AR000CCPOM
352  * @tc.author: xushaohua
353  */
354 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync004, TestSize.Level1)
355 {
356     DBStatus status = OK;
357     std::vector<std::string> devices;
358     devices.push_back(g_deviceB->GetDeviceId());
359     devices.push_back(g_deviceC->GetDeviceId());
360 
361     WatermarkInfo info;
362     CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info);
363     CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info);
364     /**
365      * @tc.steps: step1. deviceB put {k1, v1}
366      */
367     Key key = {'1'};
368     Value value = {'1'};
369     g_deviceB->PutData(key, value, 0, 0);
370 
371     /**
372      * @tc.steps: step2. deviceB put {k2, v2}
373      */
374     Key key2 = {'2'};
375     Value value2 = {'2'};
376     g_deviceC->PutData(key2, value2, 0, 0);
377     ASSERT_TRUE(status == OK);
378 
379     /**
380      * @tc.steps: step3. deviceA call pull sync
381      * @tc.expected: step3. sync should return OK.
382      */
383     std::map<std::string, DBStatus> result;
384     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
385     ASSERT_TRUE(status == OK);
386     CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info, true, false);
387     CheckWatermark(g_deviceC->GetDeviceId(), g_kvDelegatePtr, info, true, false);
388 
389     /**
390      * @tc.expected: step3. onComplete should be called, DeviceA have {k1, VALUE_1}, {K2. VALUE_2}
391      */
392     ASSERT_TRUE(result.size() == devices.size());
393     for (const auto &pair : result) {
394         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
395         EXPECT_TRUE(pair.second == OK);
396     }
397     Value value3;
398     EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
399     EXPECT_EQ(value3, value);
400     EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
401     EXPECT_EQ(value3, value2);
402 }
403 
404 /**
405  * @tc.name: Normal Sync 005
406  * @tc.desc: Test normal pull sync for update data.
407  * @tc.type: FUNC
408  * @tc.require: AR000CCPOM SR000CQE10
409  * @tc.author: xushaohua
410  */
411 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync005, TestSize.Level2)
412 {
413     DBStatus status = OK;
414     std::vector<std::string> devices;
415     devices.push_back(g_deviceB->GetDeviceId());
416     devices.push_back(g_deviceC->GetDeviceId());
417 
418     /**
419      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
420      */
421     Key key1 = {'1'};
422     Value value1 = {'1'};
423     status = g_kvDelegatePtr->Put(key1, value1);
424     ASSERT_TRUE(status == OK);
425     Key key2 = {'2'};
426     Value value2 = {'2'};
427     status = g_kvDelegatePtr->Put(key2, value2);
428     ASSERT_TRUE(status == OK);
429 
430     /**
431      * @tc.steps: step2. deviceB put {k1, v3} t2, t2 > t1
432      */
433     Value value3;
434     value3.push_back('3');
435     g_deviceB->PutData(key1, value3,
436         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 0);
437 
438     /**
439      * @tc.steps: step3. deviceC put {k2, v4} t2, t4 < t1
440      */
441     Value value4;
442     value4.push_back('4');
443     g_deviceC->PutData(key2, value4,
444         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
445 
446     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
447     /**
448      * @tc.steps: step4. deviceA call pull sync
449      * @tc.expected: step4. sync should return OK.
450      */
451     std::map<std::string, DBStatus> result;
452     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
453     ASSERT_TRUE(status == OK);
454 
455     /**
456      * @tc.expected: step4. onComplete should be called, DeviceA have {k1, v3}, {k2. v2}
457      */
458     ASSERT_TRUE(result.size() == devices.size());
459     for (const auto &pair : result) {
460         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
461         EXPECT_TRUE(pair.second == OK);
462     }
463 
464     Value value5;
465     g_kvDelegatePtr->Get(key1, value5);
466     EXPECT_TRUE(value5 == value3);
467     g_kvDelegatePtr->Get(key2, value5);
468     EXPECT_TRUE(value5 == value2);
469 }
470 
471 /**
472  * @tc.name: Normal Sync 006
473  * @tc.desc: Test normal pull sync for delete data.
474  * @tc.type: FUNC
475  * @tc.require: AR000CQS3S
476  * @tc.author: xushaohua
477  */
478 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync006, TestSize.Level2)
479 {
480     /**
481      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
482      */
483     Key key1 = {'1'};
484     Value value1 = {'1'};
485     DBStatus status = g_kvDelegatePtr->Put(key1, value1);
486     ASSERT_TRUE(status == OK);
487     Key key2 = {'2'};
488     Value value2 = {'2'};
489     status = g_kvDelegatePtr->Put(key2, value2);
490     ASSERT_TRUE(status == OK);
491 
492     /**
493      * @tc.steps: step2. deviceA put {k1, delete} t2, t2 <t1
494      */
495     Key hashKey1;
496     DistributedDBToolsUnitTest::CalcHash(key1, hashKey1);
497     g_deviceB->PutData(hashKey1, value1,
498         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 1);
499 
500     /**
501      * @tc.steps: step3. deviceA put {k1, delete} t3, t3 < t1
502      */
503     Key hashKey2;
504     DistributedDBToolsUnitTest::CalcHash(key2, hashKey2);
505     g_deviceC->PutData(hashKey2, value1,
506         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
507 
508     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
509     /**
510      * @tc.steps: step4. deviceA call pull sync
511      * @tc.expected: step4. sync should return OK.
512      */
513     std::map<std::string, DBStatus> result;
514     std::vector<std::string> devices;
515     devices.push_back(g_deviceB->GetDeviceId());
516     devices.push_back(g_deviceC->GetDeviceId());
517     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
518     ASSERT_TRUE(status == OK);
519 
520     /**
521      * @tc.expected: step4. onComplete should be called, DeviceA have {k2. v2} don't have k1
522      */
523     ASSERT_TRUE(result.size() == devices.size());
524     for (const auto &pair : result) {
525         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
526         EXPECT_TRUE(pair.second == OK);
527     }
528     Value value5;
529     g_kvDelegatePtr->Get(key1, value5);
530     EXPECT_TRUE(value5.empty());
531     g_kvDelegatePtr->Get(key2, value5);
532     EXPECT_TRUE(value5 == value2);
533 }
534 
535 /**
536  * @tc.name: Normal Sync 007
537  * @tc.desc: Test normal push_pull sync for add data.
538  * @tc.type: FUNC
539  * @tc.require: AR000CCPOM
540  * @tc.author: xushaohua
541  */
542 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync007, TestSize.Level1)
543 {
544     DBStatus status = OK;
545     std::vector<std::string> devices;
546     devices.push_back(g_deviceB->GetDeviceId());
547     devices.push_back(g_deviceC->GetDeviceId());
548 
549     /**
550      * @tc.steps: step1. deviceA put {k1, v1}
551      */
552     Key key1 = {'1'};
553     Value value1 = {'1'};
554     status = g_kvDelegatePtr->Put(key1, value1);
555     EXPECT_TRUE(status == OK);
556 
557     /**
558      * @tc.steps: step1. deviceB put {k2, v2}
559      */
560     Key key2 = {'2'};
561     Value value2 = {'2'};
562     g_deviceB->PutData(key2, value2, 0, 0);
563 
564     /**
565      * @tc.steps: step1. deviceB put {k3, v3}
566      */
567     Key key3 = {'3'};
568     Value value3 = {'3'};
569     g_deviceC->PutData(key3, value3, 0, 0);
570 
571     /**
572      * @tc.steps: step4. deviceA call push_pull sync
573      * @tc.expected: step4. sync should return OK.
574      */
575     std::map<std::string, DBStatus> result;
576     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
577     ASSERT_TRUE(status == OK);
578 
579     ASSERT_TRUE(result.size() == devices.size());
580     for (const auto &pair : result) {
581         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
582         EXPECT_TRUE(pair.second == OK);
583     }
584 
585     /**
586      * @tc.expected: step4. onComplete should be called, DeviceA have {k1. v1}, {k2, v2}, {k3, v3}
587      *     deviceB received {k1. v1}, don't received k3, deviceC received {k1. v1}, don't received k2
588      */
589     Value value4;
590     g_kvDelegatePtr->Get(key2, value4);
591     EXPECT_TRUE(value4 == value2);
592     g_kvDelegatePtr->Get(key3, value4);
593     EXPECT_TRUE(value4 == value3);
594 
595     VirtualDataItem item1;
596     g_deviceB->GetData(key1, item1);
597     EXPECT_TRUE(item1.value == value1);
598     item1.value.clear();
599     g_deviceB->GetData(key3, item1);
600     EXPECT_TRUE(item1.value.empty());
601 
602     VirtualDataItem item2;
603     g_deviceC->GetData(key1, item2);
604     EXPECT_TRUE(item2.value == value1);
605     item2.value.clear();
606     g_deviceC->GetData(key2, item2);
607     EXPECT_TRUE(item2.value.empty());
608 }
609 
610 /**
611  * @tc.name: Normal Sync 008
612  * @tc.desc: Test normal push_pull sync for update data.
613  * @tc.type: FUNC
614  * @tc.require: AR000CCPOM
615  * @tc.author: xushaohua
616  */
617 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync008, TestSize.Level2)
618 {
619     DBStatus status = OK;
620     std::vector<std::string> devices;
621     devices.push_back(g_deviceB->GetDeviceId());
622     devices.push_back(g_deviceC->GetDeviceId());
623 
624     /**
625      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
626      */
627     Key key1 = {'1'};
628     Value value1 = {'1'};
629     status = g_kvDelegatePtr->Put(key1, value1);
630     ASSERT_TRUE(status == OK);
631 
632     Key key2 = {'2'};
633     Value value2 = {'2'};
634     status = g_kvDelegatePtr->Put(key2, value2);
635     ASSERT_TRUE(status == OK);
636 
637     /**
638      * @tc.steps: step2. deviceB put {k1, v3} t2, t2 > t1
639      */
640     Value value3 = {'3'};
641     g_deviceB->PutData(key1, value3,
642         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 0);
643 
644     /**
645      * @tc.steps: step3. deviceB put {k1, v4} t3, t4 <t1
646      */
647     Value value4 = {'4'};
648     g_deviceC->PutData(key2, value4,
649         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
650     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
651 
652     /**
653      * @tc.steps: step4. deviceA call push_pull sync
654      * @tc.expected: step4. sync should return OK.
655      */
656     std::map<std::string, DBStatus> result;
657     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
658     ASSERT_TRUE(status == OK);
659     ASSERT_TRUE(result.size() == devices.size());
660     for (const auto &pair : result) {
661         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
662         EXPECT_TRUE(pair.second == OK);
663     }
664 
665     /**
666      * @tc.expected: step4. onComplete should be called, DeviceA have {k1. v3}, {k2, v2}
667      *     deviceB have {k1. v3}, deviceC have {k2. v2}
668      */
669     Value value5;
670     g_kvDelegatePtr->Get(key1, value5);
671     EXPECT_EQ(value5, value3);
672     g_kvDelegatePtr->Get(key2, value5);
673     EXPECT_EQ(value5, value2);
674 
675     VirtualDataItem item1;
676     g_deviceB->GetData(key1, item1);
677     EXPECT_TRUE(item1.value == value3);
678     item1.value.clear();
679     g_deviceB->GetData(key2, item1);
680     EXPECT_TRUE(item1.value == value2);
681 
682     VirtualDataItem item2;
683     g_deviceC->GetData(key2, item2);
684     EXPECT_TRUE(item2.value == value2);
685 }
686 
687 /**
688  * @tc.name: Normal Sync 009
689  * @tc.desc: Test normal push_pull sync for delete data.
690  * @tc.type: FUNC
691  * @tc.require: AR000CCPOM
692  * @tc.author: xushaohua
693  */
694 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync009, TestSize.Level2)
695 {
696     DBStatus status = OK;
697     std::vector<std::string> devices;
698     devices.push_back(g_deviceB->GetDeviceId());
699     devices.push_back(g_deviceC->GetDeviceId());
700 
701     /**
702      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
703      */
704     Key key1 = {'1'};
705     Value value1 = {'1'};
706     status = g_kvDelegatePtr->Put(key1, value1);
707     ASSERT_TRUE(status == OK);
708 
709     Key key2 = {'2'};
710     Value value2 = {'2'};
711     status = g_kvDelegatePtr->Put(key2, value2);
712     ASSERT_TRUE(status == OK);
713 
714     /**
715      * @tc.steps: step2. deviceB put {k1, delete} t2, t2 > t1
716      */
717     Key hashKey1;
718     DistributedDBToolsUnitTest::CalcHash(key1, hashKey1);
719     g_deviceB->PutData(hashKey1, value1,
720         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 1);
721 
722     /**
723      * @tc.steps: step3. deviceB put {k1, delete} t3, t2 < t1
724      */
725     Key hashKey2;
726     DistributedDBToolsUnitTest::CalcHash(key2, hashKey2);
727     g_deviceC->PutData(hashKey2, value2,
728         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 1);
729 
730     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
731     /**
732      * @tc.steps: step4. deviceA call push_pull sync
733      * @tc.expected: step4. sync should return OK.
734      */
735     std::map<std::string, DBStatus> result;
736     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
737     ASSERT_TRUE(status == OK);
738 
739     /**
740      * @tc.expected: step4. onComplete should be called, DeviceA have {k1. delete}, {k2, v2}
741      *     deviceB have {k2. v2}, deviceC have {k2. v2}
742      */
743     ASSERT_TRUE(result.size() == devices.size());
744     for (const auto &pair : result) {
745         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
746         EXPECT_TRUE(pair.second == OK);
747     }
748 
749     Value value3;
750     g_kvDelegatePtr->Get(key1, value3);
751     EXPECT_TRUE(value3.empty());
752     value3.clear();
753     g_kvDelegatePtr->Get(key2, value3);
754     EXPECT_TRUE(value3 == value2);
755 
756     VirtualDataItem item1;
757     g_deviceB->GetData(key2, item1);
758     EXPECT_TRUE(item1.value == value2);
759 
760     VirtualDataItem item2;
761     g_deviceC->GetData(key2, item2);
762     EXPECT_TRUE(item2.value == value2);
763 }
764 
765 /**
766  * @tc.name: Normal Sync 010
767  * @tc.desc: Test sync failed by invalid devices.
768  * @tc.type: FUNC
769  * @tc.require: AR000CCPOM
770  * @tc.author: zhangqiquan
771  */
772 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync010, TestSize.Level1)
773 {
774     DBStatus status = OK;
775     std::vector<std::string> devices;
776     std::string invalidDev = std::string(DBConstant::MAX_DEV_LENGTH + 1, '0');
777     devices.push_back(DEVICE_A);
778     devices.push_back(g_deviceB->GetDeviceId());
779     devices.push_back(invalidDev);
780 
781     std::map<std::string, DBStatus> result;
782     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
783     ASSERT_TRUE(status == OK);
784 
785     ASSERT_EQ(result.size(), devices.size());
786     EXPECT_EQ(result[DEVICE_A], INVALID_ARGS);
787     EXPECT_EQ(result[invalidDev], INVALID_ARGS);
788     EXPECT_EQ(result[DEVICE_B], OK);
789 }
790 
791 /**
792  * @tc.name: Normal Sync 011
793  * @tc.desc: Test sync with translated id.
794  * @tc.type: FUNC
795  * @tc.require:
796  * @tc.author: zhangqiquan
797  */
798 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, NormalSync011, TestSize.Level0)
799 {
800     DBStatus status = OK;
801     std::vector<std::string> devices;
802     devices.push_back(g_deviceB->GetDeviceId());
803 
804     /**
805      * @tc.steps: step1. deviceA put {k1, v1}
806      */
807     Key key = {'1'};
808     Value value = {'1'};
809     status = g_kvDelegatePtr->Put(key, value);
810     ASSERT_TRUE(status == OK);
811 
812     /**
813      * @tc.steps: step2. ori dev will be append test
814      * @tc.expected: step2. sync should return OK.
815      */
__anone3be83740302(const std::string &oriDevId, const StoreInfo &) 816     RuntimeConfig::SetTranslateToDeviceIdCallback([](const std::string &oriDevId, const StoreInfo &) {
817         std::string dev = oriDevId + "test";
818         LOGI("translate %s to %s", oriDevId.c_str(), dev.c_str());
819         return dev;
820     });
821     std::map<std::string, DBStatus> result;
822     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
823     ASSERT_TRUE(status == OK);
824     RuntimeConfig::SetTranslateToDeviceIdCallback(nullptr);
825 
826     /**
827      * @tc.expected: step2. onComplete should be called, Send watermark should not be zero
828      */
829     ASSERT_TRUE(result.size() == devices.size());
830     for (const auto &pair : result) {
831         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
832         EXPECT_TRUE(pair.second == OK);
833     }
834     WatermarkInfo info;
835     CheckWatermark(g_deviceB->GetDeviceId(), g_kvDelegatePtr, info, false);
836     info = {};
837     std::string checkDev = g_deviceB->GetDeviceId() + "test";
838     CheckWatermark(checkDev, g_kvDelegatePtr, info, false);
839 }
840 
841 /**
842  * @tc.name: Limit Data Sync 001
843  * @tc.desc: Test sync limit key and value data
844  * @tc.type: FUNC
845  * @tc.require: AR000CCPOM
846  * @tc.author: xushaohua
847  */
848 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, LimitDataSync001, TestSize.Level1)
849 {
850     DBStatus status = OK;
851     std::vector<std::string> devices;
852     devices.push_back(g_deviceB->GetDeviceId());
853 
854     Key key1;
855     Value value1;
856     DistributedDBToolsUnitTest::GetRandomKeyValue(key1, DBConstant::MAX_KEY_SIZE + 1);
857     DistributedDBToolsUnitTest::GetRandomKeyValue(value1, DBConstant::MAX_VALUE_SIZE + 1);
858 
859     Key key2;
860     Value value2;
861     DistributedDBToolsUnitTest::GetRandomKeyValue(key2, DBConstant::MAX_KEY_SIZE);
862     DistributedDBToolsUnitTest::GetRandomKeyValue(value2, DBConstant::MAX_VALUE_SIZE);
863 
864     /**
865      * @tc.steps: step1. deviceB put {k1, v1}, K1 > 1k, v1 > 4M
866      */
867     g_deviceB->PutData(key1, value1, 0, 0);
868 
869     /**
870      * @tc.steps: step2. deviceB put {k2, v2}, K2 = 1k, v2 = 4M
871      */
872     g_deviceC->PutData(key2, value2, 0, 0);
873 
874     /**
875      * @tc.steps: step3. deviceA call pull sync from device B
876      * @tc.expected: step3. sync should return OK.
877      */
878     std::map<std::string, DBStatus> result;
879     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
880     ASSERT_TRUE(status == OK);
881 
882     /**
883      * @tc.expected: step3. onComplete should be called.
884      */
885     ASSERT_TRUE(result.size() == devices.size());
886     for (const auto &pair : result) {
887         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
888         if (pair.first == g_deviceB->GetDeviceId()) {
889             EXPECT_TRUE(pair.second != OK);
890         } else {
891             EXPECT_TRUE(pair.second == OK);
892         }
893     }
894 
895     /**
896      * @tc.steps: step4. deviceA call pull sync from deviceC
897      * @tc.expected: step4. sync should return OK.
898      */
899     devices.clear();
900     result.clear();
901     devices.push_back(g_deviceC->GetDeviceId());
902     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
903     ASSERT_TRUE(status == OK);
904 
905     /**
906      * @tc.expected: step4. onComplete should be called, DeviceA have {k2. v2}, don't have {k1, v1}
907      */
908     ASSERT_TRUE(result.size() == devices.size());
909     for (const auto &pair : result) {
910         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
911         EXPECT_TRUE(pair.second == OK);
912     }
913 
914     // Get value from A
915     Value valueRead;
916     EXPECT_TRUE(g_kvDelegatePtr->Get(key1, valueRead) != OK);
917     valueRead.clear();
918     EXPECT_EQ(g_kvDelegatePtr->Get(key2, valueRead), OK);
919     EXPECT_TRUE(valueRead == value2);
920 }
921 
922 /**
923  * @tc.name: Limit Data Sync 002
924  * @tc.desc: Test PutBatch with invalid entries and then call sync.
925  * @tc.type: FUNC
926  * @tc.require: DTS2024012914038
927  * @tc.author: mazhao
928  */
929 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, LimitDataSync002, TestSize.Level1)
930 {
931     DBStatus status = OK;
932     std::vector<std::string> devices;
933     devices.push_back(g_deviceB->GetDeviceId());
934     devices.push_back(g_deviceC->GetDeviceId());
935     Key legalKey;
936     DistributedDBToolsUnitTest::GetRandomKeyValue(legalKey, DBConstant::MAX_KEY_SIZE); // 1K
937     Value legalValue;
938     DistributedDBToolsUnitTest::GetRandomKeyValue(legalValue, DBConstant::MAX_VALUE_SIZE); // 4M
939     Value emptyValue; // 0k
940     vector<Entry> illegalEntrys; // size is 512M + 1KB
941     for (int i = 0; i < 127; i++) { // 127 * (legalValue + legalKey) is equal to 508M + 127KB < 512M.
942         illegalEntrys.push_back({legalKey, legalValue});
943     }
944     for (int i = 0; i < 3970; i++) { // 3970 * legalKey is equal to 3970KB.
945         illegalEntrys.push_back({legalKey, emptyValue});
946     }
947     /**
948      * @tc.steps: step1. PutBatch with invalid entries inside which total length of the key and valud is more than 512M
949      * @tc.expected: step1. PutBatch should return INVALID_ARGS.
950      */
951     EXPECT_EQ(g_kvDelegatePtr->PutBatch(illegalEntrys), INVALID_ARGS);
952     /**
953      * @tc.steps: step2. deviceA call push_pull sync
954      * @tc.expected: step2. sync return OK and all statuses is OK.
955      */
956     std::map<std::string, DBStatus> result;
957     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
958     ASSERT_TRUE(status == OK);
959     ASSERT_TRUE(result.size() == devices.size());
960     for (const auto &pair : result) {
961         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
962         printf("dev %s, status %d", pair.first.c_str(), pair.second);
963         EXPECT_TRUE(pair.second == OK);
964     }
965 }
966 
967 /**
968  * @tc.name: Auto Sync 001
969  * @tc.desc: Verify auto sync enable function.
970  * @tc.type: FUNC
971  * @tc.require: AR000CKRTD AR000CQE0E
972  * @tc.author: xushaohua
973  */
974 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, AutoSync001, TestSize.Level1)
975 {
976     std::vector<std::string> devices;
977     devices.push_back(g_deviceB->GetDeviceId());
978     devices.push_back(g_deviceC->GetDeviceId());
979 
980     /**
981      * @tc.steps: step1. enable auto sync
982      * @tc.expected: step1, Pragma return OK.
983      */
984     bool autoSync = true;
985     PragmaData data = static_cast<PragmaData>(&autoSync);
986     DBStatus status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
987     ASSERT_EQ(status, OK);
988 
989     /**
990      * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
991      */
992     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
993     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_2, VALUE_2) == OK);
994 
995     /**
996      * @tc.steps: step3. sleep for data sync
997      * @tc.expected: step3. deviceB,C has {k1, v1}, {k2, v2}
998      */
999     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1000     VirtualDataItem item;
1001     g_deviceB->GetData(KEY_1, item);
1002     EXPECT_EQ(item.value, VALUE_1);
1003     g_deviceB->GetData(KEY_2, item);
1004     EXPECT_EQ(item.value, VALUE_2);
1005     g_deviceC->GetData(KEY_1, item);
1006     EXPECT_EQ(item.value, VALUE_1);
1007     g_deviceC->GetData(KEY_2, item);
1008     EXPECT_EQ(item.value, VALUE_2);
1009 }
1010 
1011 /**
1012  * @tc.name: Auto Sync 002
1013  * @tc.desc: Verify auto sync disable function.
1014  * @tc.type: FUNC
1015  * @tc.require: AR000CKRTD AR000CQE0E
1016  * @tc.author: xushaohua
1017  */
1018 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, AutoSync002, TestSize.Level1)
1019 {
1020     std::vector<std::string> devices;
1021     devices.push_back(g_deviceB->GetDeviceId());
1022     devices.push_back(g_deviceC->GetDeviceId());
1023 
1024     /**
1025      * @tc.steps: step1. disable auto sync
1026      * @tc.expected: step1, Pragma return OK.
1027      */
1028     bool autoSync = false;
1029     PragmaData data = static_cast<PragmaData>(&autoSync);
1030     DBStatus status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1031     ASSERT_EQ(status, OK);
1032 
1033     /**
1034      * @tc.steps: step2. deviceB put {k1, v1}, deviceC put {k2, v2}
1035      */
1036     g_deviceB->PutData(KEY_1, VALUE_1, 0, 0);
1037     g_deviceC->PutData(KEY_2, VALUE_2, 0, 0);
1038 
1039     /**
1040      * @tc.steps: step3. sleep for data sync
1041      * @tc.expected: step3. deviceA don't have k1, k2.
1042      */
1043     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1044     Value value3;
1045     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == NOT_FOUND);
1046     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == NOT_FOUND);
1047 }
1048 
1049 /**
1050  * @tc.name: Block Sync 001
1051  * @tc.desc: Verify block push sync function.
1052  * @tc.type: FUNC
1053  * @tc.require: AR000CKRTD AR000CQE0E
1054  * @tc.author: xushaohua
1055  */
1056 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync001, TestSize.Level1)
1057 {
1058     std::vector<std::string> devices;
1059     devices.push_back(g_deviceB->GetDeviceId());
1060     devices.push_back(g_deviceC->GetDeviceId());
1061 
1062     /**
1063      * @tc.steps: step1. deviceA put {k1, v1}
1064      */
1065     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1066 
1067     /**
1068      * @tc.steps: step2. deviceA call block push sync to deviceB & deviceC.
1069      * @tc.expected: step2. Sync return OK, devices status OK, deviceB & deivceC has {k1, v1}.
1070      */
1071     std::map<std::string, DBStatus> result;
1072     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1073     ASSERT_EQ(status, OK);
1074     ASSERT_TRUE(result.size() == devices.size());
1075     for (const auto &pair : result) {
1076         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1077         EXPECT_TRUE(pair.second == OK);
1078     }
1079     VirtualDataItem item1;
1080     EXPECT_EQ(g_deviceB->GetData(KEY_1, item1), E_OK);
1081     EXPECT_EQ(item1.value, VALUE_1);
1082     VirtualDataItem item2;
1083     EXPECT_EQ(g_deviceC->GetData(KEY_1, item2), E_OK);
1084     EXPECT_EQ(item2.value, VALUE_1);
1085 }
1086 
1087 /**
1088  * @tc.name:  Block Sync 002
1089  * @tc.desc: Verify block pull sync function.
1090  * @tc.type: FUNC
1091  * @tc.require: AR000CKRTD AR000CQE0E
1092  * @tc.author: xushaohua
1093  */
1094 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync002, TestSize.Level1)
1095 {
1096     std::vector<std::string> devices;
1097     devices.push_back(g_deviceB->GetDeviceId());
1098     devices.push_back(g_deviceC->GetDeviceId());
1099 
1100     /**
1101      * @tc.steps: step1. deviceB put {k1, v1}, deviceC put {k2, v2}
1102      */
1103     g_deviceB->PutData(KEY_1, VALUE_1, 0, 0);
1104     g_deviceC->PutData(KEY_2, VALUE_2, 0, 0);
1105 
1106     /**
1107      * @tc.steps: step2. deviceA call block pull and pull sync to deviceB & deviceC.
1108      * @tc.expected: step2. Sync return OK, devices status OK, deviceA has {k1, v1}, {k2, v2}
1109      */
1110     std::map<std::string, DBStatus> result;
1111     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1112     ASSERT_EQ(status, OK);
1113     ASSERT_TRUE(result.size() == devices.size());
1114     for (const auto &pair : result) {
1115         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1116         EXPECT_TRUE(pair.second == OK);
1117     }
1118     Value value3;
1119     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == OK);
1120     EXPECT_TRUE(value3 == VALUE_1);
1121     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == OK);
1122     EXPECT_TRUE(value3 == VALUE_2);
1123 }
1124 
1125 /**
1126  * @tc.name:  Block Sync 003
1127  * @tc.desc: Verify block push and pull sync function.
1128  * @tc.type: FUNC
1129  * @tc.require: AR000CKRTD AR000CQE0E
1130  * @tc.author: xushaohua
1131  */
1132 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync003, TestSize.Level1)
1133 {
1134     std::vector<std::string> devices;
1135     devices.push_back(g_deviceB->GetDeviceId());
1136     devices.push_back(g_deviceC->GetDeviceId());
1137 
1138     /**
1139      * @tc.steps: step1. deviceA put {k1, v1}
1140      */
1141     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1142 
1143     /**
1144      * @tc.steps: step2. deviceB put {k1, v1}, deviceB put {k2, v2}
1145      */
1146     g_deviceB->PutData(KEY_2, VALUE_2, 0, 0);
1147     g_deviceC->PutData(KEY_3, VALUE_3, 0, 0);
1148 
1149     /**
1150      * @tc.steps: step3. deviceA call block pull and pull sync to deviceB & deviceC.
1151      * @tc.expected: step3. Sync return OK, devices status OK, deviceA has {k1, v1}, {k2, v2} {k3, v3}
1152      *      deviceB has {k1, v1}, {k2. v2} , deviceC has {k1, v1}, {k3, v3}
1153      */
1154     std::map<std::string, DBStatus> result;
1155     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, true);
1156     ASSERT_EQ(status, OK);
1157     ASSERT_TRUE(result.size() == devices.size());
1158     for (const auto &pair : result) {
1159         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1160         EXPECT_TRUE(pair.second == OK);
1161     }
1162 
1163     VirtualDataItem item1;
1164     g_deviceB->GetData(KEY_1, item1);
1165     EXPECT_TRUE(item1.value == VALUE_1);
1166     g_deviceB->GetData(KEY_2, item1);
1167     EXPECT_TRUE(item1.value == VALUE_2);
1168 
1169     VirtualDataItem item2;
1170     g_deviceC->GetData(KEY_1, item2);
1171     EXPECT_TRUE(item2.value == VALUE_1);
1172     g_deviceC->GetData(KEY_3, item2);
1173     EXPECT_TRUE(item2.value == VALUE_3);
1174 
1175     Value value3;
1176     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == OK);
1177     EXPECT_TRUE(value3 == VALUE_1);
1178     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == OK);
1179     EXPECT_TRUE(value3 == VALUE_2);
1180     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_3, value3) == OK);
1181     EXPECT_TRUE(value3 == VALUE_3);
1182 }
1183 
1184 /**
1185  * @tc.name:  Block Sync 004
1186  * @tc.desc: Verify block sync function invalid args.
1187  * @tc.type: FUNC
1188  * @tc.require: AR000CKRTD AR000CQE0E
1189  * @tc.author: xushaohua
1190  */
1191 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync004, TestSize.Level2)
1192 {
1193     std::vector<std::string> devices;
1194 
1195     /**
1196      * @tc.steps: step1. deviceA put {k1, v1}
1197      */
1198     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1199 
1200     /**
1201      * @tc.steps: step2. deviceA call block push sync to deviceB & deviceC.
1202      * @tc.expected: step2. Sync return INVALID_ARGS
1203      */
1204     std::map<std::string, DBStatus> result;
1205     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1206     EXPECT_EQ(status, INVALID_ARGS);
1207 
1208     /**
1209      * @tc.steps: step3. deviceB, deviceC offlinem and push deviceA sync to deviceB and deviceC.
1210      * @tc.expected: step3. Sync return OK, but the deviceB and deviceC are TIME_OUT
1211      */
1212     devices.push_back(g_deviceB->GetDeviceId());
1213     devices.push_back(g_deviceC->GetDeviceId());
1214     g_deviceB->Offline();
1215     g_deviceC->Offline();
1216 
1217     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1218     EXPECT_EQ(status, OK);
1219     ASSERT_TRUE(result.size() == devices.size());
1220     for (const auto &pair : result) {
1221         // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
1222         // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
1223         // The returned status is COMM_FAILURE
1224         EXPECT_TRUE((pair.second == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
1225             (pair.second == COMM_FAILURE));
1226     }
1227 }
1228 
1229 /**
1230  * @tc.name:  Block Sync 005
1231  * @tc.desc: Verify block sync function busy.
1232  * @tc.type: FUNC
1233  * @tc.require: AR000CKRTD AR000CQE0E
1234  * @tc.author: xushaohua
1235  */
1236 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, BlockSync005, TestSize.Level2)
1237 {
1238     std::vector<std::string> devices;
1239     devices.push_back(g_deviceB->GetDeviceId());
1240     devices.push_back(g_deviceC->GetDeviceId());
1241     /**
1242      * @tc.steps: step1. deviceA put {k1, v1}
1243      */
1244     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1245 
1246     /**
1247      * @tc.steps: step2. New a thread to deviceA call block push sync to deviceB & deviceC,
1248      *      but deviceB & C is blocked
1249      * @tc.expected: step2. Sync will be blocked util timeout, and then return OK
1250      */
1251     g_deviceB->Offline();
1252     g_deviceC->Offline();
__anone3be83740402() 1253     thread thread([devices]() {
1254         std::map<std::string, DBStatus> resultInner;
1255         DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1256         EXPECT_EQ(status, OK);
1257     });
1258     thread.detach();
1259     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1260     /**
1261      * @tc.steps: step3. sleep 1s and call sync.
1262      * @tc.expected: step3. Sync will return BUSY.
1263      */
1264     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1265     std::map<std::string, DBStatus> result;
1266     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, true);
1267     EXPECT_EQ(status, OK);
1268 }
1269 
1270 /**
1271   * @tc.name: SyncQueue001
1272   * @tc.desc: Invalid args check of Pragma GET_QUEUED_SYNC_SIZE SET_QUEUED_SYNC_LIMIT and
1273   * GET_QUEUED_SYNC_LIMIT, expect return INVALID_ARGS.
1274   * @tc.type: FUNC
1275   * @tc.require: AR000D4876
1276   * @tc.author: wangchuanqing
1277   */
1278 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue001, TestSize.Level3)
1279 {
1280     /**
1281      * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE, and set param to be null
1282      * @tc.expected: step1. Expect return INVALID_ARGS.
1283      */
1284     int *param = nullptr;
1285     PragmaData input = static_cast<PragmaData>(param);
1286     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), INVALID_ARGS);
1287 
1288     /**
1289      * @tc.steps:step2. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be null
1290      * @tc.expected: step2. Expect return INVALID_ARGS.
1291      */
1292     input = static_cast<PragmaData>(param);
1293     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1294 
1295     /**
1296      * @tc.steps:step3. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT, and set param to be null
1297      * @tc.expected: step3. Expect return INVALID_ARGS.
1298      */
1299     input = static_cast<PragmaData>(param);
1300     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1301 
1302     /**
1303      * @tc.steps:step4. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be QUEUED_SYNC_LIMIT_MIN - 1
1304      * @tc.expected: step4. Expect return INVALID_ARGS.
1305      */
1306     int limit = DBConstant::QUEUED_SYNC_LIMIT_MIN - 1;
1307     input = static_cast<PragmaData>(&limit);
1308     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1309 
1310     /**
1311      * @tc.steps:step5. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be QUEUED_SYNC_LIMIT_MAX + 1
1312      * @tc.expected: step5. Expect return INVALID_ARGS.
1313      */
1314     limit = DBConstant::QUEUED_SYNC_LIMIT_MAX + 1;
1315     input = static_cast<PragmaData>(&limit);
1316     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1317 }
1318 
1319 /**
1320   * @tc.name: SyncQueue002
1321   * @tc.desc: Pragma GET_QUEUED_SYNC_LIMIT and SET_QUEUED_SYNC_LIMIT
1322   * @tc.type: FUNC
1323   * @tc.require: AR000D4876
1324   * @tc.author: wangchuanqing
1325   */
1326 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue002, TestSize.Level3)
1327 {
1328     /**
1329      * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT,
1330      * @tc.expected: step1. Expect return OK, limit eq QUEUED_SYNC_LIMIT_DEFAULT.
1331      */
1332     int limit = 0;
1333     PragmaData input = static_cast<PragmaData>(&limit);
1334     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), OK);
1335     EXPECT_EQ(limit, DBConstant::QUEUED_SYNC_LIMIT_DEFAULT);
1336 
1337     /**
1338      * @tc.steps:step2. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be 50
1339      * @tc.expected: step2. Expect return OK.
1340      */
1341     limit = 50;
1342     input = static_cast<PragmaData>(&limit);
1343     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), OK);
1344 
1345     /**
1346      * @tc.steps:step3. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT,
1347      * @tc.expected: step3. Expect return OK, limit eq 50
1348      */
1349     limit = 0;
1350     input = static_cast<PragmaData>(&limit);
1351     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), OK);
1352     EXPECT_EQ(limit, 50);
1353 }
1354 
1355 /**
1356   * @tc.name: SyncQueue003
1357   * @tc.desc: sync queue test
1358   * @tc.type: FUNC
1359   * @tc.require: AR000D4876
1360   * @tc.author: wangchuanqing
1361   */
1362 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue003, TestSize.Level3)
1363 {
1364     DBStatus status = OK;
1365     std::vector<std::string> devices;
1366     devices.push_back(g_deviceB->GetDeviceId());
1367     devices.push_back(g_deviceC->GetDeviceId());
1368 
1369     /**
1370      * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1371      * @tc.expected: step1. Expect return OK, size eq 0.
1372      */
1373     int size;
1374     PragmaData input = static_cast<PragmaData>(&size);
1375     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1376     EXPECT_EQ(size, 0);
1377 
1378     /**
1379      * @tc.steps:step2. deviceA put {k1, v1}
1380      */
1381     status = g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1382     ASSERT_TRUE(status == OK);
1383 
1384     /**
1385      * @tc.steps:step3. deviceA sync SYNC_MODE_PUSH_ONLY
1386      */
1387     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1388     ASSERT_TRUE(status == OK);
1389 
1390     /**
1391      * @tc.steps:step4. deviceA put {k2, v2}
1392      */
1393     status = g_kvDelegatePtr->Put(KEY_2, VALUE_2);
1394     ASSERT_TRUE(status == OK);
1395 
1396     /**
1397      * @tc.steps:step5. deviceA sync SYNC_MODE_PUSH_ONLY
1398      */
1399     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1400     ASSERT_TRUE(status == OK);
1401 
1402     /**
1403      * @tc.steps:step6. deviceB put {k3, v3}
1404      */
1405     g_deviceB->PutData(KEY_3, VALUE_3, 0, 0);
1406 
1407     /**
1408      * @tc.steps:step7. deviceA put {k4, v4}
1409      */
1410     status = g_kvDelegatePtr->Put(KEY_4, VALUE_4);
1411     ASSERT_TRUE(status == OK);
1412 
1413     /**
1414      * @tc.steps:step8. deviceA sync SYNC_MODE_PUSH_PULL
1415      */
1416     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_PULL, nullptr, false);
1417     ASSERT_TRUE(status == OK);
1418 
1419     /**
1420      * @tc.steps:step9. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1421      * @tc.expected: step1. Expect return OK, 0 <= size <= 4
1422      */
1423     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1424     ASSERT_TRUE((size >= 0) && (size <= 4));
1425 
1426     /**
1427      * @tc.steps:step10. deviceB put {k5, v5}
1428      */
1429     g_deviceB->PutData(KEY_5, VALUE_5, 0, 0);
1430 
1431     /**
1432      * @tc.steps:step11. deviceA call sync and wait
1433      * @tc.expected: step11. sync should return OK.
1434      */
1435     std::map<std::string, DBStatus> result;
1436     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
1437     ASSERT_TRUE(status == OK);
1438 
1439     /**
1440      * @tc.expected: step11. onComplete should be called, DeviceA,B,C have {k1,v1}~ {KEY_5,VALUE_5}
1441      */
1442     ASSERT_TRUE(result.size() == devices.size());
1443     for (const auto &pair : result) {
1444         EXPECT_TRUE(pair.second == OK);
1445     }
1446     VirtualDataItem item;
1447     g_deviceB->GetData(KEY_1, item);
1448     EXPECT_TRUE(item.value == VALUE_1);
1449     g_deviceB->GetData(KEY_2, item);
1450     EXPECT_TRUE(item.value == VALUE_2);
1451     g_deviceB->GetData(KEY_3, item);
1452     EXPECT_TRUE(item.value == VALUE_3);
1453     g_deviceB->GetData(KEY_4, item);
1454     EXPECT_TRUE(item.value == VALUE_4);
1455     g_deviceB->GetData(KEY_5, item);
1456     EXPECT_TRUE(item.value == VALUE_5);
1457     Value value;
1458     EXPECT_EQ(g_kvDelegatePtr->Get(KEY_3, value), OK);
1459     EXPECT_EQ(VALUE_3, value);
1460     EXPECT_EQ(g_kvDelegatePtr->Get(KEY_5, value), OK);
1461     EXPECT_EQ(VALUE_5, value);
1462 }
1463 
1464 /**
1465   * @tc.name: SyncQueue004
1466   * @tc.desc: sync queue full test
1467   * @tc.type: FUNC
1468   * @tc.require: AR000D4876
1469   * @tc.author: wangchuanqing
1470   */
1471 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue004, TestSize.Level3)
1472 {
1473     DBStatus status = OK;
1474     std::vector<std::string> devices;
1475     devices.push_back(g_deviceB->GetDeviceId());
1476     devices.push_back(g_deviceC->GetDeviceId());
1477 
1478     /**
1479      * @tc.steps:step1. deviceB C block
1480      */
1481     g_communicatorAggregator->SetBlockValue(true);
1482 
1483     /**
1484      * @tc.steps:step2. deviceA put {k1, v1}
1485      */
1486     status = g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1487     ASSERT_TRUE(status == OK);
1488 
1489     /**
1490      * @tc.steps:step3. deviceA sync QUEUED_SYNC_LIMIT_DEFAULT times
1491      * @tc.expected: step3. Expect return OK
1492      */
1493     for (int i = 0; i < DBConstant::QUEUED_SYNC_LIMIT_DEFAULT; i++) {
1494         status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1495         ASSERT_TRUE(status == OK);
1496     }
1497 
1498     /**
1499      * @tc.steps:step4. deviceA sync
1500      * @tc.expected: step4. Expect return BUSY
1501      */
1502     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1503     ASSERT_TRUE(status == BUSY);
1504     g_communicatorAggregator->SetBlockValue(false);
1505 }
1506 
1507 /**
1508   * @tc.name: SyncQueue005
1509   * @tc.desc: block sync queue test
1510   * @tc.type: FUNC
1511   * @tc.require: AR000D4876
1512   * @tc.author: wangchuanqing
1513   */
1514 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, SyncQueue005, TestSize.Level3)
1515 {
1516     std::vector<std::string> devices;
1517     devices.push_back(g_deviceB->GetDeviceId());
1518     devices.push_back(g_deviceC->GetDeviceId());
1519     /**
1520      * @tc.steps:step1. New a thread to deviceA call block push sync to deviceB & deviceC,
1521      *      but deviceB & C is offline
1522      * @tc.expected: step1. Sync will be blocked util timeout, and then return OK
1523      */
1524     g_deviceB->Offline();
1525     g_deviceC->Offline();
1526     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1527 
1528     /**
1529      * @tc.steps:step2. deviceA put {k1, v1}
1530      */
1531     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1532 
1533     std::mutex lockMutex;
1534     std::condition_variable conditionVar;
1535 
__anone3be83740502() 1536     std::thread threadFirst([devices]() {
1537         std::map<std::string, DBStatus> resultInner;
1538         DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1539         EXPECT_EQ(status, OK);
1540     });
1541     threadFirst.detach();
1542     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1543     /**
1544      * @tc.steps:step3. New a thread to deviceA call block push sync to deviceB & deviceC,
1545      *      but deviceB & C is offline
1546      * @tc.expected: step2. Sync will be blocked util timeout, and then return OK
1547      */
__anone3be83740602() 1548     std::thread threadSecond([devices, &lockMutex, &conditionVar]() {
1549         std::map<std::string, DBStatus> resultInner;
1550         DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1551         EXPECT_EQ(status, OK);
1552         std::unique_lock<mutex> lockInner(lockMutex);
1553         conditionVar.notify_one();
1554     });
1555     threadSecond.detach();
1556 
1557     /**
1558      * @tc.steps:step4. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1559      * @tc.expected: step1. Expect return OK, size eq 0.
1560      */
1561     int size;
1562     PragmaData input = static_cast<PragmaData>(&size);
1563     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1564     EXPECT_EQ(size, 0);
1565 
1566     /**
1567      * @tc.steps:step5. wait exit
1568      */
1569     std::unique_lock<mutex> lock(lockMutex);
1570     auto now = std::chrono::system_clock::now();
1571     conditionVar.wait_until(lock, now + 2 * INT8_MAX * 1000ms);
1572 }
1573 
1574 /**
1575   * @tc.name: CalculateSyncData001
1576   * @tc.desc: Test sync data whose device never synced before
1577   * @tc.type: FUNC
1578   * @tc.require: AR000HI2JS
1579   * @tc.author: zhuwentao
1580   */
1581 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData001, TestSize.Level3)
1582 {
1583     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1584     size_t dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
1585     uint32_t serialHeadLen = 8u;
1586     EXPECT_EQ(static_cast<uint32_t>(dataSize), 0u + serialHeadLen);
1587     uint32_t keySize = 256u;
1588     uint32_t valuesize = 1024u;
1589     uint32_t itemCount = 10u;
1590     CalculateDataTest(itemCount, keySize, valuesize);
1591 }
1592 
1593 /**
1594   * @tc.name: CalculateSyncData002
1595   * @tc.desc: Test sync data whose device synced before, but sync data is less than 1M
1596   * @tc.type: FUNC
1597   * @tc.require: AR000HI2JS
1598   * @tc.author: zhuwentao
1599   */
1600 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData002, TestSize.Level3)
1601 {
1602     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1603     Key key1 = {'1'};
1604     Value value1 = {'1'};
1605     EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
1606 
1607     std::vector<std::string> devices;
1608     devices.push_back(g_deviceB->GetDeviceId());
1609     std::map<std::string, DBStatus> result;
1610     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1611     ASSERT_TRUE(status == OK);
1612     ASSERT_TRUE(result.size() == devices.size());
1613     for (const auto &pair : result) {
1614         EXPECT_TRUE(pair.second == OK);
1615     }
1616 
1617     uint32_t keySize = 256u;
1618     uint32_t valuesize = 512u;
1619     uint32_t itemCount = 20u;
1620     CalculateDataTest(itemCount, keySize, valuesize);
1621 }
1622 
1623 /**
1624   * @tc.name: CalculateSyncData003
1625   * @tc.desc: Test sync data whose device synced before, but sync data is larger than 1M
1626   * @tc.type: FUNC
1627   * @tc.require: AR000HI2JS
1628   * @tc.author: zhuwentao
1629   */
1630 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData003, TestSize.Level3)
1631 {
1632     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1633     Key key1 = {'1'};
1634     Value value1 = {'1'};
1635     EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
1636 
1637     std::vector<std::string> devices;
1638     devices.push_back(g_deviceB->GetDeviceId());
1639     std::map<std::string, DBStatus> result;
1640     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1641     ASSERT_TRUE(status == OK);
1642     ASSERT_TRUE(result.size() == devices.size());
1643     for (const auto &pair : result) {
1644         EXPECT_TRUE(pair.second == OK);
1645     }
1646     uint32_t keySize = 256u;
1647     uint32_t valuesize = 1024u;
1648     uint32_t itemCount = 2048u;
1649     CalculateDataTest(itemCount, keySize, valuesize);
1650 }
1651 
1652 /**
1653   * @tc.name: CalculateSyncData004
1654   * @tc.desc: Test invalid device when call GetSyncDataSize interface
1655   * @tc.type: FUNC
1656   * @tc.require: AR000HI2JS
1657   * @tc.author: zhuwentao
1658   */
1659 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData004, TestSize.Level3)
1660 {
1661     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1662     std::string device;
1663     EXPECT_EQ(g_kvDelegatePtr->GetSyncDataSize(device), 0u);
1664 }
1665 
1666 /**
1667   * @tc.name: CalculateSyncData005
1668   * @tc.desc: Test CalculateSyncData and rekey Concurrently
1669   * @tc.type: FUNC
1670   * @tc.require: AR000HI2JS
1671   * @tc.author: zhuwentao
1672   */
1673 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, CalculateSyncData005, TestSize.Level3)
1674 {
1675     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1676     size_t dataSize = 0;
1677     Key key1 = {'1'};
1678     Value value1 = {'1'};
1679     EXPECT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
__anone3be83740702() 1680     std::thread thread1([]() {
1681         std::this_thread::sleep_for(std::chrono::milliseconds(1));
1682         CipherPassword passwd; // random password
1683         vector<uint8_t> passwdBuffer(10, 45);  // 10 and 45 as random password.
1684         passwd.SetValue(passwdBuffer.data(), passwdBuffer.size());
1685         g_kvDelegatePtr->Rekey(passwd);
1686     });
__anone3be83740802() 1687     std::thread thread2([&dataSize, &key1, &value1]() {
1688         dataSize = g_kvDelegatePtr->GetSyncDataSize(DEVICE_B);
1689         if (dataSize > 0) {
1690             uint32_t expectedDataSize = (key1.size() + value1.size());
1691             uint32_t externalSize = 70u;
1692             uint32_t serialHeadLen = 8u;
1693             ASSERT_GE(static_cast<uint32_t>(dataSize), expectedDataSize);
1694             ASSERT_LE(static_cast<uint32_t>(dataSize), serialHeadLen + expectedDataSize + externalSize);
1695         }
1696     });
1697     thread1.join();
1698     thread2.join();
1699 }
1700 
1701 /**
1702  * @tc.name: GetWaterMarkInfo001
1703  * @tc.desc: Test invalid dev for get water mark info.
1704  * @tc.type: FUNC
1705  * @tc.require:
1706  * @tc.author: zhangqiquan
1707  */
1708 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, GetWaterMarkInfo001, TestSize.Level0)
1709 {
1710     std::string dev;
1711     auto res = g_kvDelegatePtr->GetWatermarkInfo(dev);
1712     EXPECT_EQ(res.first, INVALID_ARGS);
1713     EXPECT_EQ(res.second.sendMark, 0u);
1714     EXPECT_EQ(res.second.receiveMark, 0u);
1715 
1716     dev = std::string(DBConstant::MAX_DEV_LENGTH + 1, 'a');
1717     res = g_kvDelegatePtr->GetWatermarkInfo(dev);
1718     EXPECT_EQ(res.first, INVALID_ARGS);
1719     EXPECT_EQ(res.second.sendMark, 0u);
1720     EXPECT_EQ(res.second.receiveMark, 0u);
1721 }
1722 
1723 /**
1724  * @tc.name: QuerySyncRetry001
1725  * @tc.desc: use sync retry sync use query pull by compress
1726  * @tc.type: FUNC
1727  * @tc.require:
1728  * @tc.author: zhangqiquan
1729  */
1730 HWTEST_F(DistributedDBSingleVerP2PSimpleSyncTest, QuerySyncRetry001, TestSize.Level3)
1731 {
1732     if (g_kvDelegatePtr != nullptr) {
1733         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1734         g_kvDelegatePtr = nullptr;
1735     }
1736     /**
1737      * @tc.steps: step1. open db use Compress
1738      * @tc.expected: step1, Pragma return OK.
1739      */
1740     KvStoreNbDelegate::Option option;
1741     option.isNeedCompressOnSync = true;
1742     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1743     ASSERT_TRUE(g_kvDelegateStatus == OK);
1744     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1745 
1746     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, QUERY_SYNC_MESSAGE);
1747     std::vector<std::string> devices;
1748     devices.push_back(g_deviceB->GetDeviceId());
1749 
1750     /**
1751      * @tc.steps: step2. set sync retry
1752      * @tc.expected: step2, Pragma return OK.
1753      */
1754     int pragmaData = 1;
1755     PragmaData input = static_cast<PragmaData>(&pragmaData);
1756     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1757 
1758     /**
1759      * @tc.steps: step3. deviceA call sync and wait
1760      * @tc.expected: step3. sync should return OK.
1761      */
1762     std::map<std::string, DBStatus> result;
1763     auto query = Query::Select().PrefixKey({});
1764     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query) == OK);
1765 
1766     /**
1767      * @tc.expected: step4. onComplete should be called, and status is time_out
1768      */
1769     ASSERT_TRUE(result.size() == devices.size());
1770     for (const auto &pair : result) {
1771         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1772         EXPECT_EQ(pair.second, OK);
1773     }
1774     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1775 }
1776 }