• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <condition_variable>
17 #include <gtest/gtest.h>
18 #include <thread>
19 
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "platform_specific.h"
27 #include "single_ver_data_sync.h"
28 #include "single_ver_kv_sync_task_context.h"
29 
30 using namespace testing::ext;
31 using namespace DistributedDB;
32 using namespace DistributedDBUnitTest;
33 using namespace std;
34 
35 namespace {
36     class TestSingleVerKvSyncTaskContext : public SingleVerKvSyncTaskContext {
37     public:
38         TestSingleVerKvSyncTaskContext() = default;
39     };
40     string g_testDir;
41     const string STORE_ID = "kv_stroe_sync_test";
42     const int64_t TIME_OFFSET = 5000000;
43     const int WAIT_TIME = 1000;
44     const int WAIT_5_SECONDS = 5000;
45     const int WAIT_30_SECONDS = 30000;
46     const int WAIT_36_SECONDS = 36000;
47     const std::string DEVICE_A = "real_device";
48     const std::string DEVICE_B = "deviceB";
49     const std::string DEVICE_C = "deviceC";
50     const std::string CREATE_SYNC_TABLE_SQL =
51     "CREATE TABLE IF NOT EXISTS sync_data(" \
52         "key         BLOB NOT NULL," \
53         "value       BLOB," \
54         "timestamp   INT  NOT NULL," \
55         "flag        INT  NOT NULL," \
56         "device      BLOB," \
57         "ori_device  BLOB," \
58         "hash_key    BLOB PRIMARY KEY NOT NULL," \
59         "w_timestamp INT);";
60 
61     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
62     KvStoreConfig g_config;
63     DistributedDBToolsUnitTest g_tool;
64     DBStatus g_kvDelegateStatus = INVALID_ARGS;
65     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
66     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
67     KvVirtualDevice *g_deviceB = nullptr;
68     KvVirtualDevice *g_deviceC = nullptr;
69 
70     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
71     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
72         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
73 
PullSyncTest()74     void PullSyncTest()
75     {
76         DBStatus status = OK;
77         std::vector<std::string> devices;
78         devices.push_back(g_deviceB->GetDeviceId());
79 
80         Key key = {'1'};
81         Key key2 = {'2'};
82         Value value = {'1'};
83         g_deviceB->PutData(key, value, 0, 0);
84         g_deviceB->PutData(key2, value, 1, 0);
85 
86         std::map<std::string, DBStatus> result;
87         status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
88         ASSERT_TRUE(status == OK);
89 
90         ASSERT_TRUE(result.size() == devices.size());
91         for (const auto &pair : result) {
92             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
93             EXPECT_TRUE(pair.second == OK);
94         }
95         Value value3;
96         EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
97         EXPECT_EQ(value3, value);
98         EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
99         EXPECT_EQ(value3, value);
100     }
101 
CrudTest()102     void CrudTest()
103     {
104         vector<Entry> entries;
105         int totalSize = 10;
106         for (int i = 0; i < totalSize; i++) {
107             Entry entry;
108             entry.key.push_back(i);
109             entry.value.push_back('2');
110             entries.push_back(entry);
111         }
112         EXPECT_TRUE(g_kvDelegatePtr->PutBatch(entries) == OK);
113         for (const auto &entry : entries) {
114             Value resultvalue;
115             EXPECT_TRUE(g_kvDelegatePtr->Get(entry.key, resultvalue) == OK);
116             EXPECT_TRUE(resultvalue == entry.value);
117         }
118         for (int i = 0; i < totalSize / 2; i++) {
119             g_kvDelegatePtr->Delete(entries[i].key);
120             Value resultvalue;
121             EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == NOT_FOUND);
122         }
123         for (int i = totalSize / 2; i < totalSize; i++) {
124             Value value = entries[i].value;
125             value.push_back('x');
126             EXPECT_TRUE(g_kvDelegatePtr->Put(entries[i].key, value) == OK);
127             Value resultvalue;
128             EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == OK);
129             EXPECT_TRUE(resultvalue == value);
130         }
131     }
132 }
133 
134 class DistributedDBSingleVerP2PSyncTest : public testing::Test {
135 public:
136     static void SetUpTestCase(void);
137     static void TearDownTestCase(void);
138     void SetUp();
139     void TearDown();
140 };
141 
SetUpTestCase(void)142 void DistributedDBSingleVerP2PSyncTest::SetUpTestCase(void)
143 {
144     /**
145      * @tc.setup: Init datadir and Virtual Communicator.
146      */
147     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
148     g_config.dataDir = g_testDir;
149     g_mgr.SetKvStoreConfig(g_config);
150 
151     string dir = g_testDir + "/single_ver";
152     DIR* dirTmp = opendir(dir.c_str());
153     if (dirTmp == nullptr) {
154         OS::MakeDBDirectory(dir);
155     } else {
156         closedir(dirTmp);
157     }
158 
159     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
160     ASSERT_TRUE(g_communicatorAggregator != nullptr);
161     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
162 }
163 
TearDownTestCase(void)164 void DistributedDBSingleVerP2PSyncTest::TearDownTestCase(void)
165 {
166     /**
167      * @tc.teardown: Release virtual Communicator and clear data dir.
168      */
169     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
170         LOGE("rm test db files error!");
171     }
172     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
173 }
174 
SetUp(void)175 void DistributedDBSingleVerP2PSyncTest::SetUp(void)
176 {
177     DistributedDBToolsUnitTest::PrintTestCaseInfo();
178     /**
179      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
180      */
181     KvStoreNbDelegate::Option option;
182     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
183     ASSERT_TRUE(g_kvDelegateStatus == OK);
184     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
185     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
186     ASSERT_TRUE(g_deviceB != nullptr);
187     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
188     ASSERT_TRUE(syncInterfaceB != nullptr);
189     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
190 
191     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
192     ASSERT_TRUE(g_deviceC != nullptr);
193     VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
194     ASSERT_TRUE(syncInterfaceC != nullptr);
195     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
196 
197     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
198                                 const std::string &deviceId, uint8_t flag) -> bool {
199                                 return true;};
200     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
201 }
202 
TearDown(void)203 void DistributedDBSingleVerP2PSyncTest::TearDown(void)
204 {
205     /**
206      * @tc.teardown: Release device A, B, C
207      */
208     if (g_kvDelegatePtr != nullptr) {
209         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
210         g_kvDelegatePtr = nullptr;
211         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
212         LOGD("delete kv store status %d", status);
213         ASSERT_TRUE(status == OK);
214     }
215     if (g_deviceB != nullptr) {
216         delete g_deviceB;
217         g_deviceB = nullptr;
218     }
219     if (g_deviceC != nullptr) {
220         delete g_deviceC;
221         g_deviceC = nullptr;
222     }
223     PermissionCheckCallbackV2 nullCallback;
224     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
225 }
226 
227 /**
228  * @tc.name: Normal Sync 001
229  * @tc.desc: Test normal push sync for add data.
230  * @tc.type: FUNC
231  * @tc.require: AR000CQS3S SR000CQE0B
232  * @tc.author: xushaohua
233  */
234 HWTEST_F(DistributedDBSingleVerP2PSyncTest, NormalSync001, TestSize.Level1)
235 {
236     DBStatus status = OK;
237     std::vector<std::string> devices;
238     devices.push_back(g_deviceB->GetDeviceId());
239     devices.push_back(g_deviceC->GetDeviceId());
240 
241     /**
242      * @tc.steps: step1. deviceA put {k1, v1}
243      */
244     Key key = {'1'};
245     Value value = {'1'};
246     status = g_kvDelegatePtr->Put(key, value);
247     ASSERT_TRUE(status == OK);
248 
249     /**
250      * @tc.steps: step2. deviceA call sync and wait
251      * @tc.expected: step2. sync should return OK.
252      */
253     std::map<std::string, DBStatus> result;
254     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
255     ASSERT_TRUE(status == OK);
256 
257     /**
258      * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
259      */
260     ASSERT_TRUE(result.size() == devices.size());
261     for (const auto &pair : result) {
262         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
263         EXPECT_TRUE(pair.second == OK);
264     }
265     VirtualDataItem item;
266     g_deviceB->GetData(key, item);
267     EXPECT_TRUE(item.value == value);
268     g_deviceC->GetData(key, item);
269     EXPECT_TRUE(item.value == value);
270 }
271 
272 /**
273  * @tc.name: Normal Sync 002
274  * @tc.desc: Test normal push sync for update data.
275  * @tc.type: FUNC
276  * @tc.require: AR000CCPOM
277  * @tc.author: xushaohua
278  */
279 HWTEST_F(DistributedDBSingleVerP2PSyncTest, NormalSync002, TestSize.Level1)
280 {
281     DBStatus status = OK;
282     std::vector<std::string> devices;
283     devices.push_back(g_deviceB->GetDeviceId());
284     devices.push_back(g_deviceC->GetDeviceId());
285 
286     /**
287      * @tc.steps: step1. deviceA put {k1, v1}
288      */
289     Key key = {'1'};
290     Value value = {'1'};
291     status = g_kvDelegatePtr->Put(key, value);
292     ASSERT_TRUE(status == OK);
293 
294     /**
295      * @tc.steps: step2. deviceA put {k1, v2}
296      */
297     Value value2;
298     value2.push_back('2');
299     status = g_kvDelegatePtr->Put(key, value2);
300     ASSERT_TRUE(status == OK);
301 
302     /**
303      * @tc.steps: step3. deviceA call sync and wait
304      * @tc.expected: step3. sync should return OK.
305      */
306     std::map<std::string, DBStatus> result;
307     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
308     ASSERT_TRUE(status == OK);
309 
310     /**
311      * @tc.expected: step3. onComplete should be called, DeviceB,C have {k1,v2}
312      */
313     ASSERT_TRUE(result.size() == devices.size());
314     for (const auto &pair : result) {
315         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
316         EXPECT_TRUE(pair.second == OK);
317     }
318     VirtualDataItem item;
319     g_deviceC->GetData(key, item);
320     EXPECT_TRUE(item.value == value2);
321     g_deviceB->GetData(key, item);
322     EXPECT_TRUE(item.value == value2);
323 }
324 
325 /**
326  * @tc.name: Normal Sync 003
327  * @tc.desc: Test normal push sync for delete data.
328  * @tc.type: FUNC
329  * @tc.require: AR000CQS3S
330  * @tc.author: xushaohua
331  */
332 HWTEST_F(DistributedDBSingleVerP2PSyncTest, NormalSync003, TestSize.Level1)
333 {
334     DBStatus status = OK;
335     std::vector<std::string> devices;
336     devices.push_back(g_deviceB->GetDeviceId());
337     devices.push_back(g_deviceC->GetDeviceId());
338 
339     /**
340      * @tc.steps: step1. deviceA put {k1, v1}
341      */
342     Key key = {'1'};
343     Value value = {'1'};
344     status = g_kvDelegatePtr->Put(key, value);
345     ASSERT_TRUE(status == OK);
346 
347     /**
348      * @tc.steps: step2. deviceA delete k1
349      */
350     status = g_kvDelegatePtr->Delete(key);
351     ASSERT_TRUE(status == OK);
352     std::map<std::string, DBStatus> result;
353     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
354     ASSERT_TRUE(status == OK);
355 
356     /**
357      * @tc.steps: step3. deviceA call sync and wait
358      * @tc.expected: step3. sync should return OK.
359      */
360     ASSERT_TRUE(result.size() == devices.size());
361     for (const auto &pair : result) {
362         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
363         EXPECT_TRUE(pair.second == OK);
364     }
365 
366     /**
367      * @tc.expected: step3. onComplete should be called, DeviceB,C have {k1, delete}
368      */
369     VirtualDataItem item;
370     Key hashKey;
371     DistributedDBToolsUnitTest::CalcHash(key, hashKey);
372     EXPECT_EQ(g_deviceB->GetData(hashKey, item), -E_NOT_FOUND);
373     EXPECT_EQ(g_deviceC->GetData(hashKey, item), -E_NOT_FOUND);
374 }
375 
376 /**
377  * @tc.name: Normal Sync 004
378  * @tc.desc: Test normal pull sync for add data.
379  * @tc.type: FUNC
380  * @tc.require: AR000CCPOM
381  * @tc.author: xushaohua
382  */
383 HWTEST_F(DistributedDBSingleVerP2PSyncTest, NormalSync004, TestSize.Level1)
384 {
385     DBStatus status = OK;
386     std::vector<std::string> devices;
387     devices.push_back(g_deviceB->GetDeviceId());
388     devices.push_back(g_deviceC->GetDeviceId());
389 
390     /**
391      * @tc.steps: step1. deviceB put {k1, v1}
392      */
393     Key key = {'1'};
394     Value value = {'1'};
395     g_deviceB->PutData(key, value, 0, 0);
396 
397     /**
398      * @tc.steps: step2. deviceB put {k2, v2}
399      */
400     Key key2 = {'2'};
401     Value value2 = {'2'};
402     g_deviceC->PutData(key2, value2, 0, 0);
403     ASSERT_TRUE(status == OK);
404 
405     /**
406      * @tc.steps: step3. deviceA call pull sync
407      * @tc.expected: step3. sync should return OK.
408      */
409     std::map<std::string, DBStatus> result;
410     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
411     ASSERT_TRUE(status == OK);
412 
413     /**
414      * @tc.expected: step3. onComplete should be called, DeviceA have {k1, VALUE_1}, {K2. VALUE_2}
415      */
416     ASSERT_TRUE(result.size() == devices.size());
417     for (const auto &pair : result) {
418         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
419         EXPECT_TRUE(pair.second == OK);
420     }
421     Value value3;
422     EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
423     EXPECT_EQ(value3, value);
424     EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
425     EXPECT_EQ(value3, value2);
426 }
427 
428 /**
429  * @tc.name: Normal Sync 005
430  * @tc.desc: Test normal pull sync for update data.
431  * @tc.type: FUNC
432  * @tc.require: AR000CCPOM SR000CQE10
433  * @tc.author: xushaohua
434  */
435 HWTEST_F(DistributedDBSingleVerP2PSyncTest, NormalSync005, TestSize.Level2)
436 {
437     DBStatus status = OK;
438     std::vector<std::string> devices;
439     devices.push_back(g_deviceB->GetDeviceId());
440     devices.push_back(g_deviceC->GetDeviceId());
441 
442     /**
443      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
444      */
445     Key key1 = {'1'};
446     Value value1 = {'1'};
447     status = g_kvDelegatePtr->Put(key1, value1);
448     ASSERT_TRUE(status == OK);
449     Key key2 = {'2'};
450     Value value2 = {'2'};
451     status = g_kvDelegatePtr->Put(key2, value2);
452     ASSERT_TRUE(status == OK);
453 
454     /**
455      * @tc.steps: step2. deviceB put {k1, v3} t2, t2 > t1
456      */
457     Value value3;
458     value3.push_back('3');
459     g_deviceB->PutData(key1, value3,
460         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 0);
461 
462     /**
463      * @tc.steps: step3. deviceC put {k2, v4} t2, t4 < t1
464      */
465     Value value4;
466     value4.push_back('4');
467     g_deviceC->PutData(key2, value4,
468         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
469 
470     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
471     /**
472      * @tc.steps: step4. deviceA call pull sync
473      * @tc.expected: step4. sync should return OK.
474      */
475     std::map<std::string, DBStatus> result;
476     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
477     ASSERT_TRUE(status == OK);
478 
479     /**
480      * @tc.expected: step4. onComplete should be called, DeviceA have {k1, v3}, {k2. v2}
481      */
482     ASSERT_TRUE(result.size() == devices.size());
483     for (const auto &pair : result) {
484         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
485         EXPECT_TRUE(pair.second == OK);
486     }
487 
488     Value value5;
489     g_kvDelegatePtr->Get(key1, value5);
490     EXPECT_TRUE(value5 == value3);
491     g_kvDelegatePtr->Get(key2, value5);
492     EXPECT_TRUE(value5 == value2);
493 }
494 
495 /**
496  * @tc.name: Normal Sync 006
497  * @tc.desc: Test normal pull sync for delete data.
498  * @tc.type: FUNC
499  * @tc.require: AR000CQS3S
500  * @tc.author: xushaohua
501  */
502 HWTEST_F(DistributedDBSingleVerP2PSyncTest, NormalSync006, TestSize.Level2)
503 {
504     /**
505      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
506      */
507     Key key1 = {'1'};
508     Value value1 = {'1'};
509     DBStatus status = g_kvDelegatePtr->Put(key1, value1);
510     ASSERT_TRUE(status == OK);
511     Key key2 = {'2'};
512     Value value2 = {'2'};
513     status = g_kvDelegatePtr->Put(key2, value2);
514     ASSERT_TRUE(status == OK);
515 
516     /**
517      * @tc.steps: step2. deviceA put {k1, delete} t2, t2 <t1
518      */
519     Key hashKey1;
520     DistributedDBToolsUnitTest::CalcHash(key1, hashKey1);
521     g_deviceB->PutData(hashKey1, value1,
522         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 1);
523 
524     /**
525      * @tc.steps: step3. deviceA put {k1, delete} t3, t3 < t1
526      */
527     Key hashKey2;
528     DistributedDBToolsUnitTest::CalcHash(key2, hashKey2);
529     g_deviceC->PutData(hashKey2, value1,
530         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
531 
532     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
533     /**
534      * @tc.steps: step4. deviceA call pull sync
535      * @tc.expected: step4. sync should return OK.
536      */
537     std::map<std::string, DBStatus> result;
538     std::vector<std::string> devices;
539     devices.push_back(g_deviceB->GetDeviceId());
540     devices.push_back(g_deviceC->GetDeviceId());
541     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
542     ASSERT_TRUE(status == OK);
543 
544     /**
545      * @tc.expected: step4. onComplete should be called, DeviceA have {k2. v2} don't have k1
546      */
547     ASSERT_TRUE(result.size() == devices.size());
548     for (const auto &pair : result) {
549         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
550         EXPECT_TRUE(pair.second == OK);
551     }
552     Value value5;
553     g_kvDelegatePtr->Get(key1, value5);
554     EXPECT_TRUE(value5.empty());
555     g_kvDelegatePtr->Get(key2, value5);
556     EXPECT_TRUE(value5 == value2);
557 }
558 
559 /**
560  * @tc.name: Normal Sync 007
561  * @tc.desc: Test normal push_pull sync for add data.
562  * @tc.type: FUNC
563  * @tc.require: AR000CCPOM
564  * @tc.author: xushaohua
565  */
566 HWTEST_F(DistributedDBSingleVerP2PSyncTest, NormalSync007, TestSize.Level1)
567 {
568     DBStatus status = OK;
569     std::vector<std::string> devices;
570     devices.push_back(g_deviceB->GetDeviceId());
571     devices.push_back(g_deviceC->GetDeviceId());
572 
573     /**
574      * @tc.steps: step1. deviceA put {k1, v1}
575      */
576     Key key1 = {'1'};
577     Value value1 = {'1'};
578     status = g_kvDelegatePtr->Put(key1, value1);
579     EXPECT_TRUE(status == OK);
580 
581     /**
582      * @tc.steps: step1. deviceB put {k2, v2}
583      */
584     Key key2 = {'2'};
585     Value value2 = {'2'};
586     g_deviceB->PutData(key2, value2, 0, 0);
587 
588     /**
589      * @tc.steps: step1. deviceB put {k3, v3}
590      */
591     Key key3 = {'3'};
592     Value value3 = {'3'};
593     g_deviceC->PutData(key3, value3, 0, 0);
594 
595     /**
596      * @tc.steps: step4. deviceA call push_pull sync
597      * @tc.expected: step4. sync should return OK.
598      */
599     std::map<std::string, DBStatus> result;
600     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
601     ASSERT_TRUE(status == OK);
602 
603     ASSERT_TRUE(result.size() == devices.size());
604     for (const auto &pair : result) {
605         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
606         EXPECT_TRUE(pair.second == OK);
607     }
608 
609     /**
610      * @tc.expected: step4. onComplete should be called, DeviceA have {k1. v1}, {k2, v2}, {k3, v3}
611      *     deviceB received {k1. v1}, don't received k3, deviceC received {k1. v1}, don't received k2
612      */
613     Value value4;
614     g_kvDelegatePtr->Get(key2, value4);
615     EXPECT_TRUE(value4 == value2);
616     g_kvDelegatePtr->Get(key3, value4);
617     EXPECT_TRUE(value4 == value3);
618 
619     VirtualDataItem item1;
620     g_deviceB->GetData(key1, item1);
621     EXPECT_TRUE(item1.value == value1);
622     item1.value.clear();
623     g_deviceB->GetData(key3, item1);
624     EXPECT_TRUE(item1.value.empty());
625 
626     VirtualDataItem item2;
627     g_deviceC->GetData(key1, item2);
628     EXPECT_TRUE(item2.value == value1);
629     item2.value.clear();
630     g_deviceC->GetData(key2, item2);
631     EXPECT_TRUE(item2.value.empty());
632 }
633 
634 /**
635  * @tc.name: Normal Sync 008
636  * @tc.desc: Test normal push_pull sync for update data.
637  * @tc.type: FUNC
638  * @tc.require: AR000CCPOM
639  * @tc.author: xushaohua
640  */
641 HWTEST_F(DistributedDBSingleVerP2PSyncTest, NormalSync008, TestSize.Level2)
642 {
643     DBStatus status = OK;
644     std::vector<std::string> devices;
645     devices.push_back(g_deviceB->GetDeviceId());
646     devices.push_back(g_deviceC->GetDeviceId());
647 
648     /**
649      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
650      */
651     Key key1 = {'1'};
652     Value value1 = {'1'};
653     status = g_kvDelegatePtr->Put(key1, value1);
654     ASSERT_TRUE(status == OK);
655 
656     Key key2 = {'2'};
657     Value value2 = {'2'};
658     status = g_kvDelegatePtr->Put(key2, value2);
659     ASSERT_TRUE(status == OK);
660 
661     /**
662      * @tc.steps: step2. deviceB put {k1, v3} t2, t2 > t1
663      */
664     Value value3 = {'3'};
665     g_deviceB->PutData(key1, value3,
666         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 0);
667 
668     /**
669      * @tc.steps: step3. deviceB put {k1, v4} t3, t4 <t1
670      */
671     Value value4 = {'4'};
672     g_deviceC->PutData(key2, value4,
673         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 0);
674     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
675 
676     /**
677      * @tc.steps: step4. deviceA call push_pull sync
678      * @tc.expected: step4. sync should return OK.
679      */
680     std::map<std::string, DBStatus> result;
681     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
682     ASSERT_TRUE(status == OK);
683     ASSERT_TRUE(result.size() == devices.size());
684     for (const auto &pair : result) {
685         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
686         EXPECT_TRUE(pair.second == OK);
687     }
688 
689     /**
690      * @tc.expected: step4. onComplete should be called, DeviceA have {k1. v3}, {k2, v2}
691      *     deviceB have {k1. v3}, deviceC have {k2. v2}
692      */
693     Value value5;
694     g_kvDelegatePtr->Get(key1, value5);
695     EXPECT_EQ(value5, value3);
696     g_kvDelegatePtr->Get(key2, value5);
697     EXPECT_EQ(value5, value2);
698 
699     VirtualDataItem item1;
700     g_deviceB->GetData(key1, item1);
701     EXPECT_TRUE(item1.value == value3);
702     item1.value.clear();
703     g_deviceB->GetData(key2, item1);
704     EXPECT_TRUE(item1.value == value2);
705 
706     VirtualDataItem item2;
707     g_deviceC->GetData(key2, item2);
708     EXPECT_TRUE(item2.value == value2);
709 }
710 
711 /**
712  * @tc.name: Normal Sync 009
713  * @tc.desc: Test normal push_pull sync for delete data.
714  * @tc.type: FUNC
715  * @tc.require: AR000CCPOM
716  * @tc.author: xushaohua
717  */
718 HWTEST_F(DistributedDBSingleVerP2PSyncTest, NormalSync009, TestSize.Level2)
719 {
720     DBStatus status = OK;
721     std::vector<std::string> devices;
722     devices.push_back(g_deviceB->GetDeviceId());
723     devices.push_back(g_deviceC->GetDeviceId());
724 
725     /**
726      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2} t1
727      */
728     Key key1 = {'1'};
729     Value value1 = {'1'};
730     status = g_kvDelegatePtr->Put(key1, value1);
731     ASSERT_TRUE(status == OK);
732 
733     Key key2 = {'2'};
734     Value value2 = {'2'};
735     status = g_kvDelegatePtr->Put(key2, value2);
736     ASSERT_TRUE(status == OK);
737 
738     /**
739      * @tc.steps: step2. deviceB put {k1, delete} t2, t2 > t1
740      */
741     Key hashKey1;
742     DistributedDBToolsUnitTest::CalcHash(key1, hashKey1);
743     g_deviceB->PutData(hashKey1, value1,
744         TimeHelper::GetSysCurrentTime() + g_deviceB->GetLocalTimeOffset() + TIME_OFFSET, 1);
745 
746     /**
747      * @tc.steps: step3. deviceB put {k1, delete} t3, t2 < t1
748      */
749     Key hashKey2;
750     DistributedDBToolsUnitTest::CalcHash(key2, hashKey2);
751     g_deviceC->PutData(hashKey2, value2,
752         TimeHelper::GetSysCurrentTime() + g_deviceC->GetLocalTimeOffset() - TIME_OFFSET, 1);
753 
754     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
755     /**
756      * @tc.steps: step4. deviceA call push_pull sync
757      * @tc.expected: step4. sync should return OK.
758      */
759     std::map<std::string, DBStatus> result;
760     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
761     ASSERT_TRUE(status == OK);
762 
763     /**
764      * @tc.expected: step4. onComplete should be called, DeviceA have {k1. delete}, {k2, v2}
765      *     deviceB have {k2. v2}, deviceC have {k2. v2}
766      */
767     ASSERT_TRUE(result.size() == devices.size());
768     for (const auto &pair : result) {
769         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
770         EXPECT_TRUE(pair.second == OK);
771     }
772 
773     Value value3;
774     g_kvDelegatePtr->Get(key1, value3);
775     EXPECT_TRUE(value3.empty());
776     value3.clear();
777     g_kvDelegatePtr->Get(key2, value3);
778     EXPECT_TRUE(value3 == value2);
779 
780     VirtualDataItem item1;
781     g_deviceB->GetData(key2, item1);
782     EXPECT_TRUE(item1.value == value2);
783 
784     VirtualDataItem item2;
785     g_deviceC->GetData(key2, item2);
786     EXPECT_TRUE(item2.value == value2);
787 }
788 
789 /**
790  * @tc.name: Normal Sync 010
791  * @tc.desc: Test sync failed by invalid devices.
792  * @tc.type: FUNC
793  * @tc.require: AR000CCPOM
794  * @tc.author: zhangqiquan
795  */
796 HWTEST_F(DistributedDBSingleVerP2PSyncTest, NormalSync010, TestSize.Level1)
797 {
798     DBStatus status = OK;
799     std::vector<std::string> devices;
800     devices.push_back(DEVICE_A);
801     devices.push_back(g_deviceB->GetDeviceId());
802 
803     std::map<std::string, DBStatus> result;
804     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
805     ASSERT_TRUE(status == OK);
806 
807     ASSERT_EQ(result.size(), devices.size());
808     EXPECT_EQ(result[DEVICE_A], INVALID_ARGS);
809     EXPECT_EQ(result[DEVICE_B], OK);
810 }
811 
812 /**
813  * @tc.name: Limit Data Sync 001
814  * @tc.desc: Test sync limit key and value data
815  * @tc.type: FUNC
816  * @tc.require: AR000CCPOM
817  * @tc.author: xushaohua
818  */
819 HWTEST_F(DistributedDBSingleVerP2PSyncTest, LimitDataSync001, TestSize.Level1)
820 {
821     DBStatus status = OK;
822     std::vector<std::string> devices;
823     devices.push_back(g_deviceB->GetDeviceId());
824 
825     Key key1;
826     Value value1;
827     DistributedDBToolsUnitTest::GetRandomKeyValue(key1, DBConstant::MAX_KEY_SIZE + 1);
828     DistributedDBToolsUnitTest::GetRandomKeyValue(value1, DBConstant::MAX_VALUE_SIZE + 1);
829 
830     Key key2;
831     Value value2;
832     DistributedDBToolsUnitTest::GetRandomKeyValue(key2, DBConstant::MAX_KEY_SIZE);
833     DistributedDBToolsUnitTest::GetRandomKeyValue(value2, DBConstant::MAX_VALUE_SIZE);
834 
835     /**
836      * @tc.steps: step1. deviceB put {k1, v1}, K1 > 1k, v1 > 4M
837      */
838     g_deviceB->PutData(key1, value1, 0, 0);
839 
840     /**
841      * @tc.steps: step2. deviceB put {k2, v2}, K2 = 1k, v2 = 4M
842      */
843     g_deviceC->PutData(key2, value2, 0, 0);
844 
845     /**
846      * @tc.steps: step3. deviceA call pull sync from device B
847      * @tc.expected: step3. sync should return OK.
848      */
849     std::map<std::string, DBStatus> result;
850     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
851     ASSERT_TRUE(status == OK);
852 
853     /**
854      * @tc.expected: step3. onComplete should be called.
855      */
856     ASSERT_TRUE(result.size() == devices.size());
857     for (const auto &pair : result) {
858         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
859         if (pair.first == g_deviceB->GetDeviceId()) {
860             EXPECT_TRUE(pair.second != OK);
861         } else {
862             EXPECT_TRUE(pair.second == OK);
863         }
864     }
865 
866     /**
867      * @tc.steps: step4. deviceA call pull sync from deviceC
868      * @tc.expected: step4. sync should return OK.
869      */
870     devices.clear();
871     result.clear();
872     devices.push_back(g_deviceC->GetDeviceId());
873     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
874     ASSERT_TRUE(status == OK);
875 
876     /**
877      * @tc.expected: step4. onComplete should be called, DeviceA have {k2. v2}, don't have {k1, v1}
878      */
879     ASSERT_TRUE(result.size() == devices.size());
880     for (const auto &pair : result) {
881         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
882         EXPECT_TRUE(pair.second == OK);
883     }
884 
885     // Get value from A
886     Value valueRead;
887     EXPECT_TRUE(g_kvDelegatePtr->Get(key1, valueRead) != OK);
888     valueRead.clear();
889     EXPECT_EQ(g_kvDelegatePtr->Get(key2, valueRead), OK);
890     EXPECT_TRUE(valueRead == value2);
891 }
892 
893 /**
894  * @tc.name: Device Offline Sync 001
895  * @tc.desc: Test push sync when device offline
896  * @tc.type: FUNC
897  * @tc.require: AR000CCPOM
898  * @tc.author: xushaohua
899  */
900 HWTEST_F(DistributedDBSingleVerP2PSyncTest, DeviceOfflineSync001, TestSize.Level1)
901 {
902     std::vector<std::string> devices;
903     devices.push_back(g_deviceB->GetDeviceId());
904     devices.push_back(g_deviceC->GetDeviceId());
905 
906     /**
907      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2}, {k3 delete}, {k4,v2}
908      */
909     Key key1 = {'1'};
910     Value value1 = {'1'};
911     ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value1) == OK);
912 
913     Key key2 = {'2'};
914     Value value2 = {'2'};
915     ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value2) == OK);
916 
917     Key key3 = {'3'};
918     Value value3 = {'3'};
919     ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value3) == OK);
920     ASSERT_TRUE(g_kvDelegatePtr->Delete(key3) == OK);
921 
922     Key key4 = {'4'};
923     Value value4 = {'4'};
924     ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value4) == OK);
925 
926     /**
927      * @tc.steps: step2. deviceB offline
928      */
929     g_deviceB->Offline();
930 
931     /**
932      * @tc.steps: step3. deviceA call pull sync
933      * @tc.expected: step3. sync should return OK.
934      */
935     std::map<std::string, DBStatus> result;
936     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
937     ASSERT_TRUE(status == OK);
938 
939     /**
940      * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
941      *     deviceC has {k1, v1}, {k2, v2}, {k3 delete}, {k4,v4}
942      */
943     for (const auto &pair : result) {
944         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
945         if (pair.first == DEVICE_B) {
946             EXPECT_TRUE(pair.second == COMM_FAILURE);
947         } else {
948             EXPECT_TRUE(pair.second == OK);
949         }
950     }
951     VirtualDataItem item;
952     g_deviceC->GetData(key1, item);
953     EXPECT_TRUE(item.value == value1);
954     item.value.clear();
955     g_deviceC->GetData(key2, item);
956     EXPECT_TRUE(item.value == value2);
957     item.value.clear();
958     Key hashKey;
959     DistributedDBToolsUnitTest::CalcHash(key3, hashKey);
960     EXPECT_TRUE(g_deviceC->GetData(hashKey, item) == -E_NOT_FOUND);
961     item.value.clear();
962     g_deviceC->GetData(key4, item);
963     EXPECT_TRUE(item.value == value4);
964 }
965 
966 /**
967  * @tc.name: Device Offline Sync 002
968  * @tc.desc: Test pull sync when device offline
969  * @tc.type: FUNC
970  * @tc.require: AR000CCPOM
971  * @tc.author: xushaohua
972  */
973 HWTEST_F(DistributedDBSingleVerP2PSyncTest, DeviceOfflineSync002, TestSize.Level1)
974 {
975     std::vector<std::string> devices;
976     devices.push_back(g_deviceB->GetDeviceId());
977     devices.push_back(g_deviceC->GetDeviceId());
978 
979     /**
980      * @tc.steps: step1. deviceB put {k1, v1}
981      */
982     Key key1 = {'1'};
983     Value value1 = {'1'};
984     g_deviceB->PutData(key1, value1, 0, 0);
985 
986     /**
987      * @tc.steps: step2. deviceB offline
988      */
989     g_deviceB->Offline();
990 
991     /**
992      * @tc.steps: step3. deviceC put {k2, v2}, {k3, delete}, {k4, v4}
993      */
994     Key key2 = {'2'};
995     Value value2 = {'2'};
996     g_deviceC->PutData(key2, value2, 0, 0);
997 
998     Key key3 = {'3'};
999     Value value3 = {'3'};
1000     g_deviceC->PutData(key3, value3, 0, 1);
1001 
1002     Key key4 = {'4'};
1003     Value value4 = {'4'};
1004     g_deviceC->PutData(key4, value4, 0, 0);
1005 
1006     /**
1007      * @tc.steps: step2. deviceA call pull sync
1008      * @tc.expected: step2. sync should return OK.
1009      */
1010     std::map<std::string, DBStatus> result;
1011     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
1012     ASSERT_TRUE(status == OK);
1013 
1014     /**
1015      * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
1016      *     deviceA has {k2, v2}, {k3 delete}, {k4,v4}
1017      */
1018     for (const auto &pair : result) {
1019         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1020         if (pair.first == DEVICE_B) {
1021             EXPECT_TRUE(pair.second == COMM_FAILURE);
1022         } else {
1023             EXPECT_TRUE(pair.second == OK);
1024         }
1025     }
1026 
1027     Value value5;
1028     EXPECT_TRUE(g_kvDelegatePtr->Get(key1, value5) != OK);
1029     g_kvDelegatePtr->Get(key2, value5);
1030     EXPECT_EQ(value5, value2);
1031     EXPECT_TRUE(g_kvDelegatePtr->Get(key3, value5) != OK);
1032     g_kvDelegatePtr->Get(key4, value5);
1033     EXPECT_EQ(value5, value4);
1034 }
1035 
1036 /**
1037  * @tc.name: Auto Sync 001
1038  * @tc.desc: Verify auto sync enable function.
1039  * @tc.type: FUNC
1040  * @tc.require: AR000CKRTD AR000CQE0E
1041  * @tc.author: xushaohua
1042  */
1043 HWTEST_F(DistributedDBSingleVerP2PSyncTest, AutoSync001, TestSize.Level1)
1044 {
1045     std::vector<std::string> devices;
1046     devices.push_back(g_deviceB->GetDeviceId());
1047     devices.push_back(g_deviceC->GetDeviceId());
1048 
1049     /**
1050      * @tc.steps: step1. enable auto sync
1051      * @tc.expected: step1, Pragma return OK.
1052      */
1053     bool autoSync = true;
1054     PragmaData data = static_cast<PragmaData>(&autoSync);
1055     DBStatus status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1056     ASSERT_EQ(status, OK);
1057 
1058     /**
1059      * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1060      */
1061     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1062     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_2, VALUE_2) == OK);
1063 
1064     /**
1065      * @tc.steps: step3. sleep for data sync
1066      * @tc.expected: step3. deviceB,C has {k1, v1}, {k2, v2}
1067      */
1068     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1069     VirtualDataItem item;
1070     g_deviceB->GetData(KEY_1, item);
1071     EXPECT_EQ(item.value, VALUE_1);
1072     g_deviceB->GetData(KEY_2, item);
1073     EXPECT_EQ(item.value, VALUE_2);
1074     g_deviceC->GetData(KEY_1, item);
1075     EXPECT_EQ(item.value, VALUE_1);
1076     g_deviceC->GetData(KEY_2, item);
1077     EXPECT_EQ(item.value, VALUE_2);
1078 }
1079 
1080 /**
1081  * @tc.name: Auto Sync 002
1082  * @tc.desc: Verify auto sync disable function.
1083  * @tc.type: FUNC
1084  * @tc.require: AR000CKRTD AR000CQE0E
1085  * @tc.author: xushaohua
1086  */
1087 HWTEST_F(DistributedDBSingleVerP2PSyncTest, AutoSync002, TestSize.Level1)
1088 {
1089     std::vector<std::string> devices;
1090     devices.push_back(g_deviceB->GetDeviceId());
1091     devices.push_back(g_deviceC->GetDeviceId());
1092 
1093     /**
1094      * @tc.steps: step1. disable auto sync
1095      * @tc.expected: step1, Pragma return OK.
1096      */
1097     bool autoSync = false;
1098     PragmaData data = static_cast<PragmaData>(&autoSync);
1099     DBStatus status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1100     ASSERT_EQ(status, OK);
1101 
1102 
1103     /**
1104      * @tc.steps: step2. deviceB put {k1, v1}, deviceC put {k2, v2}
1105      */
1106     g_deviceB->PutData(KEY_1, VALUE_1, 0, 0);
1107     g_deviceC->PutData(KEY_2, VALUE_2, 0, 0);
1108 
1109     /**
1110      * @tc.steps: step3. sleep for data sync
1111      * @tc.expected: step3. deviceA don't have k1, k2.
1112      */
1113     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1114     Value value3;
1115     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == NOT_FOUND);
1116     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == NOT_FOUND);
1117 }
1118 
1119 /**
1120  * @tc.name: Block Sync 001
1121  * @tc.desc: Verify block push sync function.
1122  * @tc.type: FUNC
1123  * @tc.require: AR000CKRTD AR000CQE0E
1124  * @tc.author: xushaohua
1125  */
1126 HWTEST_F(DistributedDBSingleVerP2PSyncTest, BlockSync001, TestSize.Level1)
1127 {
1128     std::vector<std::string> devices;
1129     devices.push_back(g_deviceB->GetDeviceId());
1130     devices.push_back(g_deviceC->GetDeviceId());
1131 
1132     /**
1133      * @tc.steps: step1. deviceA put {k1, v1}
1134      */
1135     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1136 
1137     /**
1138      * @tc.steps: step2. deviceA call block push sync to deviceB & deviceC.
1139      * @tc.expected: step2. Sync return OK, devices status OK, deviceB & deivceC has {k1, v1}.
1140      */
1141     std::map<std::string, DBStatus> result;
1142     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1143     ASSERT_EQ(status, OK);
1144     ASSERT_TRUE(result.size() == devices.size());
1145     for (const auto &pair : result) {
1146         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1147         EXPECT_TRUE(pair.second == OK);
1148     }
1149     VirtualDataItem item1;
1150     EXPECT_EQ(g_deviceB->GetData(KEY_1, item1), OK);
1151     EXPECT_EQ(item1.value, VALUE_1);
1152     VirtualDataItem item2;
1153     EXPECT_EQ(g_deviceC->GetData(KEY_1, item2), OK);
1154     EXPECT_EQ(item2.value, VALUE_1);
1155 }
1156 
1157 /**
1158  * @tc.name:  Block Sync 002
1159  * @tc.desc: Verify block pull sync function.
1160  * @tc.type: FUNC
1161  * @tc.require: AR000CKRTD AR000CQE0E
1162  * @tc.author: xushaohua
1163  */
1164 HWTEST_F(DistributedDBSingleVerP2PSyncTest, BlockSync002, TestSize.Level1)
1165 {
1166     std::vector<std::string> devices;
1167     devices.push_back(g_deviceB->GetDeviceId());
1168     devices.push_back(g_deviceC->GetDeviceId());
1169 
1170     /**
1171      * @tc.steps: step1. deviceB put {k1, v1}, deviceC put {k2, v2}
1172      */
1173     g_deviceB->PutData(KEY_1, VALUE_1, 0, 0);
1174     g_deviceC->PutData(KEY_2, VALUE_2, 0, 0);
1175 
1176     /**
1177      * @tc.steps: step2. deviceA call block pull and pull sync to deviceB & deviceC.
1178      * @tc.expected: step2. Sync return OK, devices status OK, deviceA has {k1, v1}, {k2, v2}
1179      */
1180     std::map<std::string, DBStatus> result;
1181     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1182     ASSERT_EQ(status, OK);
1183     ASSERT_TRUE(result.size() == devices.size());
1184     for (const auto &pair : result) {
1185         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1186         EXPECT_TRUE(pair.second == OK);
1187     }
1188     Value value3;
1189     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == OK);
1190     EXPECT_TRUE(value3 == VALUE_1);
1191     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == OK);
1192     EXPECT_TRUE(value3 == VALUE_2);
1193 }
1194 
1195 /**
1196  * @tc.name:  Block Sync 003
1197  * @tc.desc: Verify block push and pull sync function.
1198  * @tc.type: FUNC
1199  * @tc.require: AR000CKRTD AR000CQE0E
1200  * @tc.author: xushaohua
1201  */
1202 HWTEST_F(DistributedDBSingleVerP2PSyncTest, BlockSync003, TestSize.Level1)
1203 {
1204     std::vector<std::string> devices;
1205     devices.push_back(g_deviceB->GetDeviceId());
1206     devices.push_back(g_deviceC->GetDeviceId());
1207 
1208     /**
1209      * @tc.steps: step1. deviceA put {k1, v1}
1210      */
1211     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1212 
1213     /**
1214      * @tc.steps: step2. deviceB put {k1, v1}, deviceB put {k2, v2}
1215      */
1216     g_deviceB->PutData(KEY_2, VALUE_2, 0, 0);
1217     g_deviceC->PutData(KEY_3, VALUE_3, 0, 0);
1218 
1219     /**
1220      * @tc.steps: step3. deviceA call block pull and pull sync to deviceB & deviceC.
1221      * @tc.expected: step3. Sync return OK, devices status OK, deviceA has {k1, v1}, {k2, v2} {k3, v3}
1222      *      deviceB has {k1, v1}, {k2. v2} , deviceC has {k1, v1}, {k3, v3}
1223      */
1224     std::map<std::string, DBStatus> result;
1225     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, true);
1226     ASSERT_EQ(status, OK);
1227     ASSERT_TRUE(result.size() == devices.size());
1228     for (const auto &pair : result) {
1229         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1230         EXPECT_TRUE(pair.second == OK);
1231     }
1232 
1233     VirtualDataItem item1;
1234     g_deviceB->GetData(KEY_1, item1);
1235     EXPECT_TRUE(item1.value == VALUE_1);
1236     g_deviceB->GetData(KEY_2, item1);
1237     EXPECT_TRUE(item1.value == VALUE_2);
1238 
1239     VirtualDataItem item2;
1240     g_deviceC->GetData(KEY_1, item2);
1241     EXPECT_TRUE(item2.value == VALUE_1);
1242     g_deviceC->GetData(KEY_3, item2);
1243     EXPECT_TRUE(item2.value == VALUE_3);
1244 
1245     Value value3;
1246     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_1, value3) == OK);
1247     EXPECT_TRUE(value3 == VALUE_1);
1248     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_2, value3) == OK);
1249     EXPECT_TRUE(value3 == VALUE_2);
1250     EXPECT_TRUE(g_kvDelegatePtr->Get(KEY_3, value3) == OK);
1251     EXPECT_TRUE(value3 == VALUE_3);
1252 }
1253 
1254 /**
1255  * @tc.name:  Block Sync 004
1256  * @tc.desc: Verify block sync function invalid args.
1257  * @tc.type: FUNC
1258  * @tc.require: AR000CKRTD AR000CQE0E
1259  * @tc.author: xushaohua
1260  */
1261 HWTEST_F(DistributedDBSingleVerP2PSyncTest, BlockSync004, TestSize.Level2)
1262 {
1263     std::vector<std::string> devices;
1264 
1265     /**
1266      * @tc.steps: step1. deviceA put {k1, v1}
1267      */
1268     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1269 
1270     /**
1271      * @tc.steps: step2. deviceA call block push sync to deviceB & deviceC.
1272      * @tc.expected: step2. Sync return INVALID_ARGS
1273      */
1274     std::map<std::string, DBStatus> result;
1275     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1276     EXPECT_EQ(status, INVALID_ARGS);
1277 
1278     /**
1279      * @tc.steps: step3. deviceB, deviceC offlinem and push deviceA sync to deviceB and deviceC.
1280      * @tc.expected: step3. Sync return OK, but the deviceB and deviceC are TIME_OUT
1281      */
1282     devices.push_back(g_deviceB->GetDeviceId());
1283     devices.push_back(g_deviceC->GetDeviceId());
1284     g_deviceB->Offline();
1285     g_deviceC->Offline();
1286 
1287     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1288     EXPECT_EQ(status, OK);
1289     ASSERT_TRUE(result.size() == devices.size());
1290     for (const auto &pair : result) {
1291         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1292         EXPECT_TRUE(pair.second == COMM_FAILURE);
1293     }
1294 }
1295 
1296 /**
1297  * @tc.name:  Block Sync 005
1298  * @tc.desc: Verify block sync function busy.
1299  * @tc.type: FUNC
1300  * @tc.require: AR000CKRTD AR000CQE0E
1301  * @tc.author: xushaohua
1302  */
1303 HWTEST_F(DistributedDBSingleVerP2PSyncTest, BlockSync005, TestSize.Level2)
1304 {
1305     std::vector<std::string> devices;
1306     devices.push_back(g_deviceB->GetDeviceId());
1307     devices.push_back(g_deviceC->GetDeviceId());
1308     /**
1309      * @tc.steps: step1. deviceA put {k1, v1}
1310      */
1311     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1312 
1313     /**
1314      * @tc.steps: step2. New a thread to deviceA call block push sync to deviceB & deviceC,
1315      *      but deviceB & C is blocked
1316      * @tc.expected: step2. Sync will be blocked util timeout, and then return OK
1317      */
1318     g_deviceB->Offline();
1319     g_deviceC->Offline();
__anone9df75cb0302()1320     thread thread([devices](){
1321         std::map<std::string, DBStatus> resultInner;
1322         DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1323         EXPECT_EQ(status, OK);
1324     });
1325     thread.detach();
1326     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1327     /**
1328      * @tc.steps: step3. sleep 1s and call sync.
1329      * @tc.expected: step3. Sync will return BUSY.
1330      */
1331     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1332     std::map<std::string, DBStatus> result;
1333     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, true);
1334     EXPECT_EQ(status, OK);
1335 }
1336 
1337 /**
1338   * @tc.name: SyncQueue001
1339   * @tc.desc: Invalid args check of Pragma GET_QUEUED_SYNC_SIZE SET_QUEUED_SYNC_LIMIT and
1340   * GET_QUEUED_SYNC_LIMIT, expect return INVALID_ARGS.
1341   * @tc.type: FUNC
1342   * @tc.require: AR000D4876
1343   * @tc.author: wangchuanqing
1344   */
1345 HWTEST_F(DistributedDBSingleVerP2PSyncTest, SyncQueue001, TestSize.Level3)
1346 {
1347     /**
1348      * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE, and set param to be null
1349      * @tc.expected: step1. Expect return INVALID_ARGS.
1350      */
1351     int *param = nullptr;
1352     PragmaData input = static_cast<PragmaData>(param);
1353     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), INVALID_ARGS);
1354 
1355     /**
1356      * @tc.steps:step2. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be null
1357      * @tc.expected: step2. Expect return INVALID_ARGS.
1358      */
1359     input = static_cast<PragmaData>(param);
1360     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1361 
1362     /**
1363      * @tc.steps:step3. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT, and set param to be null
1364      * @tc.expected: step3. Expect return INVALID_ARGS.
1365      */
1366     input = static_cast<PragmaData>(param);
1367     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1368 
1369     /**
1370      * @tc.steps:step4. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be QUEUED_SYNC_LIMIT_MIN - 1
1371      * @tc.expected: step4. Expect return INVALID_ARGS.
1372      */
1373     int limit = DBConstant::QUEUED_SYNC_LIMIT_MIN - 1;
1374     input = static_cast<PragmaData>(&limit);
1375     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1376 
1377     /**
1378      * @tc.steps:step5. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be QUEUED_SYNC_LIMIT_MAX + 1
1379      * @tc.expected: step5. Expect return INVALID_ARGS.
1380      */
1381     limit = DBConstant::QUEUED_SYNC_LIMIT_MAX + 1;
1382     input = static_cast<PragmaData>(&limit);
1383     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), INVALID_ARGS);
1384 }
1385 
1386 /**
1387   * @tc.name: SyncQueue002
1388   * @tc.desc: Pragma GET_QUEUED_SYNC_LIMIT and SET_QUEUED_SYNC_LIMIT
1389   * @tc.type: FUNC
1390   * @tc.require: AR000D4876
1391   * @tc.author: wangchuanqing
1392   */
1393 HWTEST_F(DistributedDBSingleVerP2PSyncTest, SyncQueue002, TestSize.Level3)
1394 {
1395     /**
1396      * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT,
1397      * @tc.expected: step1. Expect return OK, limit eq QUEUED_SYNC_LIMIT_DEFAULT.
1398      */
1399     int limit = 0;
1400     PragmaData input = static_cast<PragmaData>(&limit);
1401     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), OK);
1402     EXPECT_EQ(limit, DBConstant::QUEUED_SYNC_LIMIT_DEFAULT);
1403 
1404     /**
1405      * @tc.steps:step2. Set PragmaCmd to be SET_QUEUED_SYNC_LIMIT, and set param to be 50
1406      * @tc.expected: step2. Expect return OK.
1407      */
1408     limit = 50;
1409     input = static_cast<PragmaData>(&limit);
1410     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), OK);
1411 
1412     /**
1413      * @tc.steps:step3. Set PragmaCmd to be GET_QUEUED_SYNC_LIMIT,
1414      * @tc.expected: step3. Expect return OK, limit eq 50
1415      */
1416     limit = 0;
1417     input = static_cast<PragmaData>(&limit);
1418     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), OK);
1419     EXPECT_EQ(limit, 50);
1420 }
1421 
1422 /**
1423   * @tc.name: SyncQueue003
1424   * @tc.desc: sync queue test
1425   * @tc.type: FUNC
1426   * @tc.require: AR000D4876
1427   * @tc.author: wangchuanqing
1428   */
1429 HWTEST_F(DistributedDBSingleVerP2PSyncTest, SyncQueue003, TestSize.Level3)
1430 {
1431     DBStatus status = OK;
1432     std::vector<std::string> devices;
1433     devices.push_back(g_deviceB->GetDeviceId());
1434     devices.push_back(g_deviceC->GetDeviceId());
1435 
1436     /**
1437      * @tc.steps:step1. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1438      * @tc.expected: step1. Expect return OK, size eq 0.
1439      */
1440     int size;
1441     PragmaData input = static_cast<PragmaData>(&size);
1442     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1443     EXPECT_EQ(size, 0);
1444 
1445     /**
1446      * @tc.steps:step2. deviceA put {k1, v1}
1447      */
1448     status = g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1449     ASSERT_TRUE(status == OK);
1450 
1451     /**
1452      * @tc.steps:step3. deviceA sync SYNC_MODE_PUSH_ONLY
1453      */
1454     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1455     ASSERT_TRUE(status == OK);
1456 
1457     /**
1458      * @tc.steps:step4. deviceA put {k2, v2}
1459      */
1460     status = g_kvDelegatePtr->Put(KEY_2, VALUE_2);
1461     ASSERT_TRUE(status == OK);
1462 
1463     /**
1464      * @tc.steps:step5. deviceA sync SYNC_MODE_PUSH_ONLY
1465      */
1466     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1467     ASSERT_TRUE(status == OK);
1468 
1469     /**
1470      * @tc.steps:step6. deviceB put {k3, v3}
1471      */
1472     g_deviceB->PutData(KEY_3, VALUE_3, 0, 0);
1473 
1474     /**
1475      * @tc.steps:step7. deviceA put {k4, v4}
1476      */
1477     status = g_kvDelegatePtr->Put(KEY_4, VALUE_4);
1478     ASSERT_TRUE(status == OK);
1479 
1480     /**
1481      * @tc.steps:step8. deviceA sync SYNC_MODE_PUSH_PULL
1482      */
1483     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_PULL, nullptr, false);
1484     ASSERT_TRUE(status == OK);
1485 
1486     /**
1487      * @tc.steps:step9. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1488      * @tc.expected: step1. Expect return OK, 0 <= size <= 4
1489      */
1490     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1491     ASSERT_TRUE((size >= 0) && (size <= 4));
1492 
1493     /**
1494      * @tc.steps:step10. deviceB put {k5, v5}
1495      */
1496     g_deviceB->PutData(KEY_5, VALUE_5, 0, 0);
1497 
1498     /**
1499      * @tc.steps:step11. deviceA call sync and wait
1500      * @tc.expected: step11. sync should return OK.
1501      */
1502     std::map<std::string, DBStatus> result;
1503     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
1504     ASSERT_TRUE(status == OK);
1505 
1506     /**
1507      * @tc.expected: step11. onComplete should be called, DeviceA,B,C have {k1,v1}~ {KEY_5,VALUE_5}
1508      */
1509     ASSERT_TRUE(result.size() == devices.size());
1510     for (const auto &pair : result) {
1511         EXPECT_TRUE(pair.second == OK);
1512     }
1513     VirtualDataItem item;
1514     g_deviceB->GetData(KEY_1, item);
1515     EXPECT_TRUE(item.value == VALUE_1);
1516     g_deviceB->GetData(KEY_2, item);
1517     EXPECT_TRUE(item.value == VALUE_2);
1518     g_deviceB->GetData(KEY_3, item);
1519     EXPECT_TRUE(item.value == VALUE_3);
1520     g_deviceB->GetData(KEY_4, item);
1521     EXPECT_TRUE(item.value == VALUE_4);
1522     g_deviceB->GetData(KEY_5, item);
1523     EXPECT_TRUE(item.value == VALUE_5);
1524     Value value;
1525     EXPECT_EQ(g_kvDelegatePtr->Get(KEY_3, value), OK);
1526     EXPECT_EQ(VALUE_3, value);
1527     EXPECT_EQ(g_kvDelegatePtr->Get(KEY_5, value), OK);
1528     EXPECT_EQ(VALUE_5, value);
1529 }
1530 
1531 /**
1532   * @tc.name: SyncQueue004
1533   * @tc.desc: sync queue full test
1534   * @tc.type: FUNC
1535   * @tc.require: AR000D4876
1536   * @tc.author: wangchuanqing
1537   */
1538 HWTEST_F(DistributedDBSingleVerP2PSyncTest, SyncQueue004, TestSize.Level3)
1539 {
1540     DBStatus status = OK;
1541     std::vector<std::string> devices;
1542     devices.push_back(g_deviceB->GetDeviceId());
1543     devices.push_back(g_deviceC->GetDeviceId());
1544 
1545     /**
1546      * @tc.steps:step1. deviceB C block
1547      */
1548     g_communicatorAggregator->SetBlockValue(true);
1549 
1550     /**
1551      * @tc.steps:step2. deviceA put {k1, v1}
1552      */
1553     status = g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1554     ASSERT_TRUE(status == OK);
1555 
1556     /**
1557      * @tc.steps:step3. deviceA sync QUEUED_SYNC_LIMIT_DEFAULT times
1558      * @tc.expected: step3. Expect return OK
1559      */
1560     for (int i = 0; i < DBConstant::QUEUED_SYNC_LIMIT_DEFAULT; i++) {
1561         status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1562         ASSERT_TRUE(status == OK);
1563     }
1564 
1565     /**
1566      * @tc.steps:step4. deviceA sync
1567      * @tc.expected: step4. Expect return BUSY
1568      */
1569     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, false);
1570     ASSERT_TRUE(status == BUSY);
1571     g_communicatorAggregator->SetBlockValue(false);
1572 }
1573 
1574 /**
1575   * @tc.name: SyncQueue005
1576   * @tc.desc: block sync queue test
1577   * @tc.type: FUNC
1578   * @tc.require: AR000D4876
1579   * @tc.author: wangchuanqing
1580   */
1581 HWTEST_F(DistributedDBSingleVerP2PSyncTest, SyncQueue005, TestSize.Level3)
1582 {
1583     std::vector<std::string> devices;
1584     devices.push_back(g_deviceB->GetDeviceId());
1585     devices.push_back(g_deviceC->GetDeviceId());
1586     /**
1587      * @tc.steps:step1. New a thread to deviceA call block push sync to deviceB & deviceC,
1588      *      but deviceB & C is offline
1589      * @tc.expected: step1. Sync will be blocked util timeout, and then return OK
1590      */
1591     g_deviceB->Offline();
1592     g_deviceC->Offline();
1593     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1594 
1595     /**
1596      * @tc.steps:step2. deviceA put {k1, v1}
1597      */
1598     g_kvDelegatePtr->Put(KEY_1, VALUE_1);
1599 
1600     std::mutex lockMutex;
1601     std::condition_variable conditionVar;
1602 
__anone9df75cb0402()1603     std::thread threadFirst([devices](){
1604         std::map<std::string, DBStatus> resultInner;
1605         DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1606         EXPECT_EQ(status, OK);
1607     });
1608     threadFirst.detach();
1609     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1610     /**
1611      * @tc.steps:step3. New a thread to deviceA call block push sync to deviceB & deviceC,
1612      *      but deviceB & C is offline
1613      * @tc.expected: step2. Sync will be blocked util timeout, and then return OK
1614      */
__anone9df75cb0502()1615     std::thread threadSecond([devices, &lockMutex, &conditionVar](){
1616         std::map<std::string, DBStatus> resultInner;
1617         DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, resultInner, true);
1618         EXPECT_EQ(status, OK);
1619         std::unique_lock<mutex> lockInner(lockMutex);
1620         conditionVar.notify_one();
1621     });
1622     threadSecond.detach();
1623 
1624     /**
1625      * @tc.steps:step4. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE,
1626      * @tc.expected: step1. Expect return OK, size eq 0.
1627      */
1628     int size;
1629     PragmaData input = static_cast<PragmaData>(&size);
1630     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), OK);
1631     EXPECT_EQ(size, 0);
1632 
1633     /**
1634      * @tc.steps:step5. wait exit
1635      */
1636     std::unique_lock<mutex> lock(lockMutex);
1637     auto now = std::chrono::system_clock::now();
1638     conditionVar.wait_until(lock, now + 2 * INT8_MAX * 1000ms);
1639 }
1640 
1641 /**
1642   * @tc.name: PermissionCheck001
1643   * @tc.desc: deviceA PermissionCheck not pass test, SYNC_MODE_PUSH_ONLY
1644   * @tc.type: FUNC
1645   * @tc.require: AR000D4876
1646   * @tc.author: wangchuanqing
1647   */
1648 HWTEST_F(DistributedDBSingleVerP2PSyncTest, PermissionCheck001, TestSize.Level3)
1649 {
1650     /**
1651      * @tc.steps: step1. SetPermissionCheckCallback
1652      * @tc.expected: step1. return OK.
1653      */
1654     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
__anone9df75cb0602(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, uint8_t flag) 1655                                         const std::string &deviceId, uint8_t flag) -> bool {
1656                                         if (flag & CHECK_FLAG_SEND) {
1657                                             LOGD("in RunPermissionCheck callback func, check not pass, flag:%d", flag);
1658                                             return false;
1659                                         } else {
1660                                             LOGD("in RunPermissionCheck callback func, check pass, flag:%d", flag);
1661                                             return true;
1662                                         }
1663                                         };
1664     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
1665     DBStatus status = OK;
1666     std::vector<std::string> devices;
1667     devices.push_back(g_deviceB->GetDeviceId());
1668     devices.push_back(g_deviceC->GetDeviceId());
1669 
1670     /**
1671      * @tc.steps: step2. deviceA put {k1, v1}
1672      */
1673     Key key = {'1'};
1674     Value value = {'1'};
1675     status = g_kvDelegatePtr->Put(key, value);
1676     ASSERT_TRUE(status == OK);
1677 
1678     /**
1679      * @tc.steps: step3. deviceA call sync and wait
1680      * @tc.expected: step3. sync should return OK.
1681      */
1682     std::map<std::string, DBStatus> result;
1683     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1684     ASSERT_TRUE(status == OK);
1685 
1686     /**
1687      * @tc.expected: step3. onComplete should be called,
1688      * status == PERMISSION_CHECK_FORBID_SYNC, deviceB and deviceC do not have {k1, v1}
1689      */
1690     ASSERT_TRUE(result.size() == devices.size());
1691     for (const auto &pair : result) {
1692         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1693         EXPECT_TRUE(pair.second == PERMISSION_CHECK_FORBID_SYNC);
1694     }
1695     VirtualDataItem item;
1696     g_deviceB->GetData(key, item);
1697     EXPECT_TRUE(item.value.empty());
1698     g_deviceC->GetData(key, item);
1699     EXPECT_TRUE(item.value.empty());
1700     PermissionCheckCallbackV2 nullCallback;
1701     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
1702 }
1703 
1704 /**
1705   * @tc.name: PermissionCheck002
1706   * @tc.desc: deviceA PermissionCheck not pass test, SYNC_MODE_PULL_ONLY
1707   * @tc.type: FUNC
1708   * @tc.require: AR000D4876
1709   * @tc.author: wangchuanqing
1710   */
1711 HWTEST_F(DistributedDBSingleVerP2PSyncTest, PermissionCheck002, TestSize.Level3)
1712 {
1713     /**
1714      * @tc.steps: step1. SetPermissionCheckCallback
1715      * @tc.expected: step1. return OK.
1716      */
1717     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
__anone9df75cb0702(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, uint8_t flag) 1718                                         const std::string &deviceId, uint8_t flag) -> bool {
1719                                         if (flag & CHECK_FLAG_RECEIVE) {
1720                                             LOGD("in RunPermissionCheck callback func, check not pass, flag:%d", flag);
1721                                             return false;
1722                                         } else {
1723                                             LOGD("in RunPermissionCheck callback func, check pass, flag:%d", flag);
1724                                             return true;
1725                                         }
1726                                         };
1727 
1728     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
1729 
1730     DBStatus status = OK;
1731     std::vector<std::string> devices;
1732     devices.push_back(g_deviceB->GetDeviceId());
1733     devices.push_back(g_deviceC->GetDeviceId());
1734 
1735     /**
1736      * @tc.steps: step2. deviceB put {k1, v1}
1737      */
1738     Key key = {'1'};
1739     Value value = {'1'};
1740     g_deviceB->PutData(key, value, 0, 0);
1741 
1742     /**
1743      * @tc.steps: step2. deviceB put {k2, v2}
1744      */
1745     Key key2 = {'2'};
1746     Value value2 = {'2'};
1747     g_deviceC->PutData(key2, value2, 0, 0);
1748     ASSERT_TRUE(status == OK);
1749 
1750     /**
1751      * @tc.steps: step3. deviceA call pull sync
1752      * @tc.expected: step3. sync should return OK.
1753      */
1754     std::map<std::string, DBStatus> result;
1755     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
1756     ASSERT_TRUE(status == OK);
1757 
1758     /**
1759      * @tc.expected: step3. onComplete should be called,
1760      * status == PERMISSION_CHECK_FORBID_SYNC, DeviceA do not have {k1, VALUE_1}, {K2. VALUE_2}
1761      */
1762     ASSERT_TRUE(result.size() == devices.size());
1763     for (const auto &pair : result) {
1764         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1765         EXPECT_TRUE(pair.second == PERMISSION_CHECK_FORBID_SYNC);
1766     }
1767     Value value3;
1768     EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), NOT_FOUND);
1769     EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), NOT_FOUND);
1770     PermissionCheckCallbackV2 nullCallback;
1771     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
1772 }
1773 
1774 /**
1775   * @tc.name: PermissionCheck003
1776   * @tc.desc: deviceA PermissionCheck not pass test, SYNC_MODE_PUSH_PULL
1777   * @tc.type: FUNC
1778   * @tc.require: AR000D4876
1779   * @tc.author: wangchuanqing
1780   */
1781 HWTEST_F(DistributedDBSingleVerP2PSyncTest, PermissionCheck003, TestSize.Level3)
1782 {
1783     /**
1784      * @tc.steps: step1. SetPermissionCheckCallback
1785      * @tc.expected: step1. return OK.
1786      */
1787     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
__anone9df75cb0802(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, uint8_t flag) 1788                                         const std::string &deviceId, uint8_t flag) -> bool {
1789                                         if (flag & (CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE)) {
1790                                             LOGD("in RunPermissionCheck callback func, check not pass, flag:%d", flag);
1791                                             return false;
1792                                         } else {
1793                                             LOGD("in RunPermissionCheck callback func, check pass, flag:%d", flag);
1794                                             return true;
1795                                         }
1796                                         };
1797     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
1798 
1799     std::vector<std::string> devices;
1800     devices.push_back(g_deviceB->GetDeviceId());
1801     devices.push_back(g_deviceC->GetDeviceId());
1802 
1803     /**
1804      * @tc.steps: step2. deviceA put {k1, v1}
1805      */
1806     Key key1 = {'1'};
1807     Value value1 = {'1'};
1808     DBStatus status = g_kvDelegatePtr->Put(key1, value1);
1809     EXPECT_TRUE(status == OK);
1810 
1811     /**
1812      * @tc.steps: step2. deviceB put {k2, v2}
1813      */
1814     Key key2 = {'2'};
1815     Value value2 = {'2'};
1816     g_deviceB->PutData(key2, value2, 0, 0);
1817 
1818     /**
1819      * @tc.steps: step2. deviceB put {k3, v3}
1820      */
1821     Key key3 = {'3'};
1822     Value value3 = {'3'};
1823     g_deviceC->PutData(key3, value3, 0, 0);
1824 
1825     /**
1826      * @tc.steps: step3. deviceA call push_pull sync
1827      * @tc.expected: step3. sync should return OK.
1828      * onComplete should be called, status == PERMISSION_CHECK_FORBID_SYNC
1829      */
1830     std::map<std::string, DBStatus> result;
1831     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
1832     ASSERT_TRUE(status == OK);
1833 
1834     ASSERT_TRUE(result.size() == devices.size());
1835     for (const auto &pair : result) {
1836         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1837         EXPECT_TRUE(pair.second == PERMISSION_CHECK_FORBID_SYNC);
1838     }
1839 
1840     /**
1841      * @tc.expected: step3. DeviceA only have {k1. v1}
1842      *      DeviceB only have {k2. v2}, DeviceC only have {k3. v3}
1843      */
1844     Value value4;
1845     EXPECT_TRUE(g_kvDelegatePtr->Get(key2, value4) == NOT_FOUND);
1846     EXPECT_TRUE(g_kvDelegatePtr->Get(key3, value4) == NOT_FOUND);
1847 
1848     VirtualDataItem item1;
1849     g_deviceB->GetData(key1, item1);
1850     EXPECT_TRUE(item1.value.empty());
1851     g_deviceB->GetData(key3, item1);
1852     EXPECT_TRUE(item1.value.empty());
1853 
1854     VirtualDataItem item2;
1855     g_deviceC->GetData(key1, item2);
1856     EXPECT_TRUE(item1.value.empty());
1857     g_deviceC->GetData(key2, item2);
1858     EXPECT_TRUE(item2.value.empty());
1859     PermissionCheckCallbackV2 nullCallback;
1860     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
1861 }
1862 
1863 /**
1864   * @tc.name: PermissionCheck004
1865   * @tc.desc: deviceB and deviceC PermissionCheck not pass test, SYNC_MODE_PUSH_ONLY
1866   * @tc.type: FUNC
1867   * @tc.require: AR000D4876
1868   * @tc.author: wangchuanqing
1869   */
1870 HWTEST_F(DistributedDBSingleVerP2PSyncTest, PermissionCheck004, TestSize.Level3)
1871 {
1872     /**
1873      * @tc.steps: step1. SetPermissionCheckCallback
1874      * @tc.expected: step1. return OK.
1875      */
1876     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
__anone9df75cb0902(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, uint8_t flag) 1877                                         const std::string &deviceId, uint8_t flag) -> bool {
1878                                         if (flag & CHECK_FLAG_RECEIVE) {
1879                                             LOGD("in RunPermissionCheck callback func, check not pass, flag:%d", flag);
1880                                             return false;
1881                                         } else {
1882                                             LOGD("in RunPermissionCheck callback func, check pass, flag:%d", flag);
1883                                             return true;
1884                                         }
1885                                         };
1886     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
1887     DBStatus status = OK;
1888     std::vector<std::string> devices;
1889     devices.push_back(g_deviceB->GetDeviceId());
1890     devices.push_back(g_deviceC->GetDeviceId());
1891 
1892     /**
1893      * @tc.steps: step2. deviceA put {k1, v1}
1894      */
1895     Key key = {'1'};
1896     Value value = {'1'};
1897     status = g_kvDelegatePtr->Put(key, value);
1898     ASSERT_TRUE(status == OK);
1899 
1900     /**
1901      * @tc.steps: step3. deviceA call sync and wait
1902      * @tc.expected: step3. sync should return OK.
1903      */
1904     std::map<std::string, DBStatus> result;
1905     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1906     ASSERT_TRUE(status == OK);
1907 
1908     /**
1909      * @tc.expected: step3. onComplete should be called,
1910      * status == PERMISSION_CHECK_FORBID_SYNC, deviceB and deviceC do not have {k1, v1}
1911      */
1912     ASSERT_TRUE(result.size() == devices.size());
1913     for (const auto &pair : result) {
1914         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1915         EXPECT_TRUE(pair.second == PERMISSION_CHECK_FORBID_SYNC);
1916     }
1917     VirtualDataItem item;
1918     g_deviceB->GetData(key, item);
1919     EXPECT_TRUE(item.value.empty());
1920     g_deviceC->GetData(key, item);
1921     EXPECT_TRUE(item.value.empty());
1922     PermissionCheckCallbackV2 nullCallback;
1923     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
1924 }
1925 
1926 /**
1927   * @tc.name: PermissionCheck005
1928   * @tc.desc: deviceB and deviceC PermissionCheck not pass test, SYNC_MODE_PULL_ONLY
1929   * @tc.type: FUNC
1930   * @tc.require: AR000D4876
1931   * @tc.author: wangchuanqing
1932   */
1933 HWTEST_F(DistributedDBSingleVerP2PSyncTest, PermissionCheck005, TestSize.Level3)
1934 {
1935     /**
1936      * @tc.steps: step1. SetPermissionCheckCallback
1937      * @tc.expected: step1. return OK.
1938      */
1939     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
__anone9df75cb0a02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, uint8_t flag) 1940                                         const std::string &deviceId, uint8_t flag) -> bool {
1941                                         if (flag & CHECK_FLAG_SEND) {
1942                                             LOGD("in RunPermissionCheck callback func, check not pass, flag:%d", flag);
1943                                             return false;
1944                                         } else {
1945                                             LOGD("in RunPermissionCheck callback func, check pass, flag:%d", flag);
1946                                             return true;
1947                                         }
1948                                         };
1949     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
1950 
1951     DBStatus status = OK;
1952     std::vector<std::string> devices;
1953     devices.push_back(g_deviceB->GetDeviceId());
1954     devices.push_back(g_deviceC->GetDeviceId());
1955 
1956     /**
1957      * @tc.steps: step2. deviceB put {k1, v1}
1958      */
1959     Key key = {'1'};
1960     Value value = {'1'};
1961     g_deviceB->PutData(key, value, 0, 0);
1962 
1963     /**
1964      * @tc.steps: step2. deviceB put {k2, v2}
1965      */
1966     Key key2 = {'2'};
1967     Value value2 = {'2'};
1968     g_deviceC->PutData(key2, value2, 0, 0);
1969     ASSERT_TRUE(status == OK);
1970 
1971     /**
1972      * @tc.steps: step3. deviceA call pull sync
1973      * @tc.expected: step3. sync should return OK.
1974      */
1975     std::map<std::string, DBStatus> result;
1976     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
1977     ASSERT_TRUE(status == OK);
1978 
1979     /**
1980      * @tc.expected: step3. onComplete should be called,
1981      * status == PERMISSION_CHECK_FORBID_SYNC, DeviceA do not have {k1, VALUE_1}, {K2. VALUE_2}
1982      */
1983     ASSERT_TRUE(result.size() == devices.size());
1984     for (const auto &pair : result) {
1985         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1986         EXPECT_TRUE(pair.second == PERMISSION_CHECK_FORBID_SYNC);
1987     }
1988     Value value3;
1989     EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), NOT_FOUND);
1990     EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), NOT_FOUND);
1991     PermissionCheckCallbackV2 nullCallback;
1992     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
1993 }
1994 
1995 /**
1996   * @tc.name: PermissionCheck006
1997   * @tc.desc: deviceA PermissionCheck deviceB not pass, deviceC pass
1998   * @tc.type: FUNC
1999   * @tc.require: AR000EJJOJ
2000   * @tc.author: wangchuanqing
2001   */
2002 HWTEST_F(DistributedDBSingleVerP2PSyncTest, PermissionCheck006, TestSize.Level3)
2003 {
2004     /**
2005      * @tc.steps: step1. SetPermissionCheckCallback
2006      * @tc.expected: step1. return OK.
2007      */
2008     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
__anone9df75cb0b02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, uint8_t flag) 2009                                         const std::string &deviceId, uint8_t flag) -> bool {
2010                                         if (deviceId == g_deviceB->GetDeviceId()) {
2011                                             LOGD("in RunPermissionCheck callback func, check not pass, flag:%d", flag);
2012                                             return false;
2013                                         } else {
2014                                             LOGD("in RunPermissionCheck callback func, check pass, flag:%d", flag);
2015                                             return true;
2016                                         }
2017                                         };
2018     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
2019     DBStatus status = OK;
2020     std::vector<std::string> devices;
2021     devices.push_back(g_deviceB->GetDeviceId());
2022     devices.push_back(g_deviceC->GetDeviceId());
2023 
2024     /**
2025      * @tc.steps: step2. deviceA put {k1, v1}
2026      */
2027     Key key = {'1'};
2028     Value value = {'1'};
2029     status = g_kvDelegatePtr->Put(key, value);
2030     ASSERT_TRUE(status == OK);
2031 
2032     /**
2033      * @tc.steps: step3. deviceA call sync and wait
2034      * @tc.expected: step3. sync should return OK.
2035      */
2036     std::map<std::string, DBStatus> result;
2037     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2038     ASSERT_TRUE(status == OK);
2039 
2040     /**
2041      * @tc.expected: step3. onComplete should be called,
2042      * status == PERMISSION_CHECK_FORBID_SYNC, deviceB and deviceC do not have {k1, v1}
2043      */
2044     ASSERT_TRUE(result.size() == devices.size());
2045     for (const auto &pair : result) {
2046         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2047         if (g_deviceB->GetDeviceId() == pair.first) {
2048             EXPECT_TRUE(pair.second == PERMISSION_CHECK_FORBID_SYNC);
2049         } else {
2050             EXPECT_TRUE(pair.second == OK);
2051         }
2052     }
2053     VirtualDataItem item;
2054     g_deviceB->GetData(key, item);
2055     EXPECT_TRUE(item.value.empty());
2056     g_deviceC->GetData(key, item);
2057     EXPECT_TRUE(item.value == value);
2058     PermissionCheckCallbackV2 nullCallback;
2059     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
2060 }
2061 
2062 /**
2063   * @tc.name: PermissionCheck007
2064   * @tc.desc: deviceA PermissionCheck, deviceB not pass, deviceC pass in SYNC_MODE_AUTO_PUSH
2065   * @tc.type: FUNC
2066   * @tc.require: AR000G3RLS
2067   * @tc.author: zhuwentao
2068   */
2069 HWTEST_F(DistributedDBSingleVerP2PSyncTest, PermissionCheck007, TestSize.Level3)
2070 {
2071     /**
2072      * @tc.steps: step1. SetPermissionCheckCallback
2073      * @tc.expected: step1. return OK.
2074      */
2075     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
__anone9df75cb0c02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, uint8_t flag) 2076                                         const std::string &deviceId, uint8_t flag) -> bool {
2077                                         if (deviceId == g_deviceC->GetDeviceId() &&
2078                                             (flag & (CHECK_FLAG_RECEIVE | CHECK_FLAG_AUTOSYNC))) {
2079                                             LOGD("in RunPermissionCheck callback func, check not pass, flag:%d", flag);
2080                                             return false;
2081                                         } else {
2082                                             LOGD("in RunPermissionCheck callback func, check pass, flag:%d", flag);
2083                                             return true;
2084                                         }
2085                                         };
2086     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
2087     DBStatus status = OK;
2088     std::vector<std::string> devices;
2089     devices.push_back(g_deviceB->GetDeviceId());
2090     devices.push_back(g_deviceC->GetDeviceId());
2091     /**
2092      * @tc.steps: step2. deviceA set auto sync
2093      */
2094     bool autoSync = true;
2095     PragmaData data = static_cast<PragmaData>(&autoSync);
2096     status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
2097     ASSERT_EQ(status, OK);
2098 
2099     /**
2100      * @tc.steps: step3. deviceA put {k1, v1}, and sleep 1s
2101      */
2102     Key key = {'1'};
2103     Value value = {'1'};
2104     status = g_kvDelegatePtr->Put(key, value);
2105     ASSERT_TRUE(status == OK);
2106     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
2107 
2108     /**
2109      * @tc.steps: step3. check value in device B and not in device C.
2110      */
2111     VirtualDataItem item;
2112     g_deviceC->GetData(key, item);
2113     EXPECT_TRUE(item.value.empty());
2114     g_deviceB->GetData(key, item);
2115     EXPECT_TRUE(item.value == value);
2116     PermissionCheckCallbackV2 nullCallback;
2117     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
2118 }
2119 
2120 /**
2121 +  * @tc.name: PermissionCheck008
2122 +  * @tc.desc: deviceA PermissionCheck, deviceB not pass, deviceC pass in SYNC_MODE_AUTO_PULL
2123 +  * @tc.type: FUNC
2124 +  * @tc.require: AR000G3RLS
2125 +  * @tc.author: zhangqiquan
2126 +  */
2127 HWTEST_F(DistributedDBSingleVerP2PSyncTest, PermissionCheck008, TestSize.Level3)
2128 {
2129     /**
2130      * @tc.steps: step1. SetPermissionCheckCallback
2131      * @tc.expected: step1. return OK.
2132      */
2133     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
__anone9df75cb0d02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, uint8_t flag) 2134                                         const std::string &deviceId, uint8_t flag) -> bool {
2135                                         if (deviceId == g_deviceC->GetDeviceId() &&
2136                                             (flag & CHECK_FLAG_SPONSOR)) {
2137                                             LOGD("in RunPermissionCheck callback func, check not pass, flag:%d", flag);
2138                                             return false;
2139                                         } else {
2140                                             LOGD("in RunPermissionCheck callback func, check pass, flag:%d", flag);
2141                                             return true;
2142                                         }
2143                                         };
2144     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
2145     DBStatus status = OK;
2146     std::vector<std::string> devices;
2147     devices.push_back(g_deviceB->GetDeviceId());
2148     devices.push_back(g_deviceC->GetDeviceId());
2149 
2150     /**
2151      * @tc.steps: step2. deviceB put {k1, v1}
2152      */
2153     Key key = {'1'};
2154     Value value = {'1'};
2155     g_deviceB->PutData(key, value, 0, 0);
2156 
2157     /**
2158      * @tc.steps: step2. device put {k2, v2}
2159      */
2160     Key key2 = {'2'};
2161     Value value2 = {'2'};
2162     g_deviceC->PutData(key2, value2, 0, 0);
2163     ASSERT_TRUE(status == OK);
2164 
2165     /**
2166      * @tc.steps: step3. deviceA call push sync
2167      * @tc.expected: step3. sync should return OK.
2168      */
2169     std::map<std::string, DBStatus> result;
2170     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
2171     ASSERT_TRUE(status == OK);
2172     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
2173 
2174     /**
2175      * @tc.expected: step4. onComplete should be called,
2176      * status == PERMISSION_CHECK_FORBID_SYNC, deviceB and deviceC do not have {k1, v1}
2177      */
2178     ASSERT_TRUE(result.size() == devices.size());
2179     for (const auto &pair : result) {
2180         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2181         if (g_deviceC->GetDeviceId() == pair.first) {
2182                 EXPECT_TRUE(pair.second == PERMISSION_CHECK_FORBID_SYNC);
2183             } else {
2184                 EXPECT_TRUE(pair.second == OK);
2185         }
2186     }
2187     /**
2188      * @tc.steps: step5. check value in device A
2189      */
2190     Value value4;
2191     EXPECT_TRUE(g_kvDelegatePtr->Get(key, value4) == OK);
2192     EXPECT_TRUE(g_kvDelegatePtr->Get(key2, value4) == NOT_FOUND);
2193     PermissionCheckCallbackV2 nullCallback;
2194     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
2195 }
2196 
2197 /**
2198   * @tc.name: PermissionCheck009
2199   * @tc.desc: different runpermissioncheck call return different value
2200   * @tc.type: FUNC
2201   * @tc.require: AR000D4876
2202   * @tc.author: zhuwentao
2203   */
2204 HWTEST_F(DistributedDBSingleVerP2PSyncTest, PermissionCheck009, TestSize.Level3)
2205 {
2206     /**
2207      * @tc.steps: step1. SetPermissionCheckCallback
2208      * @tc.expected: step1. return OK.
2209      */
2210     int count = 1;
2211     auto permissionCheckCallback = [&count] (const std::string &userId, const std::string &appId,
__anone9df75cb0e02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, uint8_t flag) 2212         const std::string &storeId, const std::string &deviceId, uint8_t flag) -> bool {
2213             (void)userId;
2214             (void)appId;
2215             (void)storeId;
2216             (void)deviceId;
2217             if (flag & CHECK_FLAG_SEND) {
2218                 bool result = count % 2;
2219                 LOGD("in RunPermissionCheck callback, check result:%d, flag:%d", result, flag);
2220                 count++;
2221                 return result;
2222             }
2223             LOGD("in RunPermissionCheck callback, check pass, flag:%d", flag);
2224             return true;
2225         };
2226     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
2227     std::vector<std::string> devices = {g_deviceB->GetDeviceId()};
2228     /**
2229      * @tc.steps: step2. deviceA call sync three times and not wait
2230      * @tc.expected: step2. sync should return OK.
2231      */
2232     std::map<std::string, DBStatus> result;
2233     ASSERT_TRUE(g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anone9df75cb0f02(const std::map<std::string, DBStatus>& statusMap) 2234         [&result](const std::map<std::string, DBStatus>& statusMap) {
2235             result = statusMap;
2236         }, false) == OK);
2237     std::map<std::string, DBStatus> result2;
2238     ASSERT_TRUE(g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anone9df75cb1002(const std::map<std::string, DBStatus>& statusMap) 2239         [&result2](const std::map<std::string, DBStatus>& statusMap) {
2240             result2 = statusMap;
2241         }, false) == OK);
2242     std::map<std::string, DBStatus> result3;
2243     ASSERT_TRUE(g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anone9df75cb1102(const std::map<std::string, DBStatus>& statusMap) 2244         [&result3](const std::map<std::string, DBStatus>& statusMap) {
2245             result3 = statusMap;
2246         }, false) == OK);
2247     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
2248     /**
2249      * @tc.expected: step3. onComplete should be called,
2250      * status is : OK, PERMISSION_CHECK_FORBID_SYNC, OK
2251      */
2252     ASSERT_TRUE(result.size() == devices.size());
2253     for (const auto &pair : result) {
2254         EXPECT_TRUE(pair.second == OK);
2255     }
2256     ASSERT_TRUE(result2.size() == devices.size());
2257     for (const auto &pair : result2) {
2258         EXPECT_TRUE(pair.second == PERMISSION_CHECK_FORBID_SYNC);
2259     }
2260     ASSERT_TRUE(result3.size() == devices.size());
2261     for (const auto &pair : result3) {
2262         EXPECT_TRUE(pair.second == OK);
2263     }
2264     PermissionCheckCallbackV2 nullCallback;
2265     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
2266 }
2267 
2268 /**
2269   * @tc.name: SaveDataNotify001
2270   * @tc.desc: Test SaveDataNotify function, delay < 30s should sync ok, > 36 should timeout
2271   * @tc.type: FUNC
2272   * @tc.require: AR000D4876
2273   * @tc.author: xushaohua
2274   */
2275 HWTEST_F(DistributedDBSingleVerP2PSyncTest, SaveDataNotify001, TestSize.Level3)
2276 {
2277     DBStatus status = OK;
2278     std::vector<std::string> devices;
2279     devices.push_back(g_deviceB->GetDeviceId());
2280 
2281     /**
2282      * @tc.steps: step1. deviceA put {k1, v1}
2283      */
2284     Key key = {'1'};
2285     Value value = {'1'};
2286     status = g_kvDelegatePtr->Put(key, value);
2287     ASSERT_TRUE(status == OK);
2288 
2289     /**
2290      * @tc.steps: step2. deviceB set sava data dely 5s
2291      */
2292     g_deviceB->SetSaveDataDelayTime(WAIT_5_SECONDS);
2293 
2294     /**
2295      * @tc.steps: step3. deviceA call sync and wait
2296      * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
2297      */
2298     std::map<std::string, DBStatus> result;
2299     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2300     ASSERT_TRUE(status == OK);
2301     ASSERT_TRUE(result.size() == devices.size());
2302     ASSERT_TRUE(result[DEVICE_B] == OK);
2303 
2304     /**
2305      * @tc.steps: step4. deviceB set sava data dely 30s and put {k1, v1}
2306      */
2307     g_deviceB->SetSaveDataDelayTime(WAIT_30_SECONDS);
2308     status = g_kvDelegatePtr->Put(key, value);
2309     ASSERT_TRUE(status == OK);
2310      /**
2311      * @tc.steps: step3. deviceA call sync and wait
2312      * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
2313      */
2314     result.clear();
2315     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2316     ASSERT_TRUE(status == OK);
2317     ASSERT_TRUE(result.size() == devices.size());
2318     ASSERT_TRUE(result[DEVICE_B] == OK);
2319 
2320     /**
2321      * @tc.steps: step4. deviceB set sava data dely 36s and put {k1, v1}
2322      */
2323     g_deviceB->SetSaveDataDelayTime(WAIT_36_SECONDS);
2324     status = g_kvDelegatePtr->Put(key, value);
2325     ASSERT_TRUE(status == OK);
2326     /**
2327      * @tc.steps: step5. deviceA call sync and wait
2328      * @tc.expected: step5. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
2329      */
2330     result.clear();
2331     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2332     ASSERT_TRUE(status == OK);
2333     ASSERT_TRUE(result.size() == devices.size());
2334     ASSERT_TRUE(result[DEVICE_B] == TIME_OUT);
2335 }
2336 
2337 /**
2338   * @tc.name: SametimeSync001
2339   * @tc.desc: Test 2 device sync with each other
2340   * @tc.type: FUNC
2341   * @tc.require: AR000CCPOM
2342   * @tc.author: zhangqiquan
2343   */
2344 HWTEST_F(DistributedDBSingleVerP2PSyncTest, SametimeSync001, TestSize.Level3)
2345 {
2346     DBStatus status = OK;
2347     std::vector<std::string> devices;
2348     devices.push_back(g_deviceB->GetDeviceId());
2349 
2350     int responseCount = 0;
2351     int requestCount = 0;
2352     Key key = {'1'};
2353     Value value = {'1'};
2354     /**
2355      * @tc.steps: step1. make sure deviceB send pull firstly and response_pull secondly
2356      * @tc.expected: step1. deviceA put data when finish push task. put data should return OK.
2357      */
2358     g_communicatorAggregator->RegOnDispatch([&responseCount, &requestCount, &key, &value](
__anone9df75cb1202( const std::string &target, DistributedDB::Message *msg) 2359         const std::string &target, DistributedDB::Message *msg) {
2360         if (target == "real_device" && msg->GetMessageId() == DATA_SYNC_MESSAGE) {
2361             if (msg->GetMessageType() == TYPE_RESPONSE) {
2362                 responseCount++;
2363                 if (responseCount == 1) { // 1 is the ack which B response A's push task
2364                     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), DBStatus::OK);
2365                     std::this_thread::sleep_for(std::chrono::seconds(1));
2366                 } else if (responseCount == 2) { // 2 is the ack which B response A's response_pull task
2367                     msg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
2368                 }
2369             } if (msg->GetMessageType() == TYPE_REQUEST) {
2370                 requestCount++;
2371                 if (requestCount == 1) { // 1 is A push task
2372                     std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2 sec
2373                 }
2374             }
2375         }
2376     });
2377     /**
2378      * @tc.steps: step2. deviceA,deviceB sync to each other at same time
2379      * @tc.expected: step2. sync should return OK.
2380      */
2381     std::map<std::string, DBStatus> result;
__anone9df75cb1302null2382     std::thread subThread([]{
2383         g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true);
2384     });
2385     status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_PULL, result);
2386     subThread.join();
2387     g_communicatorAggregator->RegOnDispatch(nullptr);
2388 
2389     EXPECT_TRUE(status == OK);
2390     ASSERT_TRUE(result.size() == devices.size());
2391     EXPECT_TRUE(result[DEVICE_B] == OK);
2392     Value actualValue;
2393     g_kvDelegatePtr->Get(key, actualValue);
2394     EXPECT_EQ(actualValue, value);
2395 }
2396 
2397 /**
2398   * @tc.name: SametimeSync002
2399   * @tc.desc: Test 2 device sync with each other with water error
2400   * @tc.type: FUNC
2401   * @tc.require: AR000CCPOM
2402   * @tc.author: zhangqiquan
2403   */
2404 HWTEST_F(DistributedDBSingleVerP2PSyncTest, SametimeSync002, TestSize.Level3)
2405 {
2406     DBStatus status = OK;
2407     std::vector<std::string> devices;
2408     devices.push_back(g_deviceB->GetDeviceId());
2409     g_kvDelegatePtr->Put({'k', '1'}, {'v', '1'});
2410     /**
2411      * @tc.steps: step1. make sure deviceA push data failed and increase water mark
2412      * @tc.expected: step1. deviceA push failed with timeout
2413      */
__anone9df75cb1402(const std::string &target, DistributedDB::Message *msg) 2414     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
2415         ASSERT_NE(msg, nullptr);
2416         if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
2417             msg->SetMessageId(INVALID_MESSAGE_ID);
2418         }
2419     });
2420     std::map<std::string, DBStatus> result;
__anone9df75cb1502(const std::map<std::string, DBStatus> &map) 2421     auto callback = [&result](const std::map<std::string, DBStatus> &map) {
2422         result = map;
2423     };
2424     Query query = Query::Select().PrefixKey({'k', '1'});
2425     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2426     ASSERT_TRUE(result.size() == devices.size());
2427     EXPECT_TRUE(result[DEVICE_B] == TIME_OUT);
2428     /**
2429      * @tc.steps: step2. A push to B with query2, sleep 1s for waiting step3
2430      * @tc.expected: step2. sync should return OK.
2431      */
__anone9df75cb1602(const std::string &target, DistributedDB::Message *msg) 2432     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
2433         ASSERT_NE(msg, nullptr);
2434         if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
2435             std::this_thread::sleep_for(std::chrono::seconds(1));
2436         }
2437     });
__anone9df75cb1702null2438     std::thread subThread([&devices] {
2439         std::map<std::string, DBStatus> result;
2440         auto callback = [&result](const std::map<std::string, DBStatus> &map) {
2441             result = map;
2442         };
2443         Query query = Query::Select().PrefixKey({'k', '2'});
2444         LOGD("Begin PUSH");
2445         EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2446         ASSERT_TRUE(result.size() == devices.size());
2447         EXPECT_TRUE(result[DEVICE_A] == OK);
2448     });
2449     /**
2450      * @tc.steps: step3. B pull to A when A is in push task
2451      * @tc.expected: step3. sync should return OP_FINISHED_ALL.
2452      */
2453     std::this_thread::sleep_for(std::chrono::milliseconds(100));
2454     std::map<std::string, int> virtualResult;
2455     g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, query,
__anone9df75cb1902(const std::map<std::string, int> &map) 2456         [&virtualResult](const std::map<std::string, int> &map) {
2457             virtualResult = map;
2458         }, true);
2459     EXPECT_TRUE(status == OK);
2460     ASSERT_EQ(virtualResult.size(), devices.size());
2461     EXPECT_EQ(virtualResult[DEVICE_A], SyncOperation::OP_FINISHED_ALL);
2462     g_communicatorAggregator->RegOnDispatch(nullptr);
2463     subThread.join();
2464 }
2465 
2466 /**
2467  * @tc.name: DatabaseOnlineCallback001
2468  * @tc.desc: check database status notify online callback
2469  * @tc.type: FUNC
2470  * @tc.require: AR000CQS3S SR000CQE0B
2471  * @tc.author: zhuwentao
2472  */
2473 HWTEST_F(DistributedDBSingleVerP2PSyncTest, DatabaseOnlineCallback001, TestSize.Level1)
2474 {
2475     /**
2476      * @tc.steps: step1. SetStoreStatusNotifier
2477      * @tc.expected: step1. SetStoreStatusNotifier ok
2478      */
2479     std::string targetDev = "DEVICE_X";
2480     bool isCheckOk = false;
2481     auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (std::string userId,
__anone9df75cb1a02(std::string userId, std::string appId, std::string storeId, const std::string deviceId, bool onlineStatus) 2482         std::string appId, std::string storeId, const std::string deviceId, bool onlineStatus) -> void {
2483         if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
2484             onlineStatus == true) {
2485             isCheckOk = true;
2486         }};
2487     g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
2488     /**
2489      * @tc.steps: step2. trigger device online
2490      * @tc.expected: step2. check callback ok
2491      */
2492     g_communicatorAggregator->OnlineDevice(targetDev);
2493     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
2494     EXPECT_EQ(isCheckOk, true);
2495     StoreStatusNotifier nullCallback;
2496     g_mgr.SetStoreStatusNotifier(nullCallback);
2497 }
2498 
2499 /**
2500  * @tc.name: DatabaseOfflineCallback001
2501  * @tc.desc: check database status notify online callback
2502  * @tc.type: FUNC
2503  * @tc.require: AR000CQS3S SR000CQE0B
2504  * @tc.author: zhuwentao
2505  */
2506 HWTEST_F(DistributedDBSingleVerP2PSyncTest, DatabaseOfflineCallback001, TestSize.Level1)
2507 {
2508     /**
2509      * @tc.steps: step1. SetStoreStatusNotifier
2510      * @tc.expected: step1. SetStoreStatusNotifier ok
2511      */
2512     std::string targetDev = "DEVICE_X";
2513     bool isCheckOk = false;
2514     auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (std::string userId,
__anone9df75cb1b02(std::string userId, std::string appId, std::string storeId, const std::string deviceId, bool onlineStatus) 2515         std::string appId, std::string storeId, const std::string deviceId, bool onlineStatus) -> void {
2516         if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
2517             onlineStatus == false) {
2518             isCheckOk = true;
2519         }};
2520     g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
2521     /**
2522      * @tc.steps: step2. trigger device offline
2523      * @tc.expected: step2. check callback ok
2524      */
2525     g_communicatorAggregator->OfflineDevice(targetDev);
2526     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
2527     EXPECT_EQ(isCheckOk, true);
2528     StoreStatusNotifier nullCallback;
2529     g_mgr.SetStoreStatusNotifier(nullCallback);
2530 }
2531 
2532 /**
2533   * @tc.name: CloseSync001
2534   * @tc.desc: Test 2 delegate close when sync
2535   * @tc.type: FUNC
2536   * @tc.require: AR000CCPOM
2537   * @tc.author: zhangqiquan
2538   */
2539 HWTEST_F(DistributedDBSingleVerP2PSyncTest, CloseSync001, TestSize.Level3)
2540 {
2541     DBStatus status = OK;
2542     std::vector<std::string> devices;
2543     devices.push_back(g_deviceB->GetDeviceId());
2544 
2545     /**
2546      * @tc.steps: step1. make sure A sync start
2547      */
2548     bool sleep = false;
__anone9df75cb1c02(const std::string &target, DistributedDB::Message *msg) 2549     g_communicatorAggregator->RegOnDispatch([&sleep](const std::string &target, DistributedDB::Message *msg) {
2550         if (!sleep) {
2551             sleep = true;
2552             std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s for waiting close db
2553         }
2554     });
2555 
2556     KvStoreNbDelegate* kvDelegatePtrA = nullptr;
2557     KvStoreNbDelegate::Option option;
__anone9df75cb1d02(DBStatus s, KvStoreNbDelegate *delegate) 2558     g_mgr.GetKvStore(STORE_ID, option, [&status, &kvDelegatePtrA](DBStatus s, KvStoreNbDelegate *delegate) {
2559         status = s;
2560         kvDelegatePtrA = delegate;
2561     });
2562     EXPECT_EQ(status, OK);
2563     EXPECT_NE(kvDelegatePtrA, nullptr);
2564 
2565     Key key = {'k'};
2566     Value value = {'v'};
2567     kvDelegatePtrA->Put(key, value);
2568     std::map<std::string, DBStatus> result;
__anone9df75cb1e02(const std::map<std::string, DBStatus>& statusMap) 2569     auto callback = [&result](const std::map<std::string, DBStatus>& statusMap) {
2570         result = statusMap;
2571     };
2572     /**
2573      * @tc.steps: step2. deviceA sync and then close
2574      * @tc.expected: step2. sync should abort and data don't exist in B
2575      */
__anone9df75cb1f02() 2576     std::thread closeThread([&kvDelegatePtrA]() {
2577         std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for waiting sync start
2578         EXPECT_EQ(g_mgr.CloseKvStore(kvDelegatePtrA), OK);
2579     });
2580     EXPECT_EQ(kvDelegatePtrA->Sync(devices, SYNC_MODE_PUSH_ONLY, callback, false), OK);
2581     LOGD("Sync finish");
2582     closeThread.join();
2583     std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s for waiting sync finish
2584     EXPECT_EQ(result.size(), 0u);
2585     VirtualDataItem actualValue;
2586     EXPECT_EQ(g_deviceB->GetData(key, actualValue), -E_NOT_FOUND);
2587     g_communicatorAggregator->RegOnDispatch(nullptr);
2588 }
2589 
2590 /**
2591   * @tc.name: CloseSync002
2592   * @tc.desc: Test 1 delegate close when in time sync
2593   * @tc.type: FUNC
2594   * @tc.require: AR000CCPOM
2595   * @tc.author: zhangqiquan
2596   */
2597 HWTEST_F(DistributedDBSingleVerP2PSyncTest, CloseSync002, TestSize.Level3)
2598 {
2599     /**
2600      * @tc.steps: step1. invalid time sync packet from A
2601      */
__anone9df75cb2002(const std::string &target, DistributedDB::Message *msg) 2602     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
2603         ASSERT_NE(msg, nullptr);
2604         if (target == DEVICE_B && msg->GetMessageId() == TIME_SYNC_MESSAGE && msg->GetMessageType() == TYPE_REQUEST) {
2605             msg->SetMessageId(INVALID_MESSAGE_ID);
2606             LOGD("Message is invalid");
2607         }
2608     });
2609     Timestamp currentTime;
2610     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
2611     g_deviceB->PutData({'k'}, {'v'}, currentTime, 0);
2612 
2613     /**
2614      * @tc.steps: step2. B PUSH to A and A close after 1s
2615      * @tc.expected: step2. A closing time cost letter than 4s
2616      */
__anone9df75cb2102() 2617     std::thread closingThread([]() {
2618         std::this_thread::sleep_for(std::chrono::seconds(1));
2619         LOGD("Begin Close");
2620         Timestamp beginTime;
2621         (void)OS::GetCurrentSysTimeInMicrosecond(beginTime);
2622         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
2623         Timestamp endTime;
2624         (void)OS::GetCurrentSysTimeInMicrosecond(endTime);
2625         EXPECT_LE(static_cast<int>(endTime - beginTime), 4 * 1000 * 1000); // waiting 4 * 1000 * 1000 us
2626         LOGD("End Close");
2627     });
2628     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
2629     closingThread.join();
2630 
2631     /**
2632      * @tc.steps: step3. remove db
2633      * @tc.expected: step3. remove ok
2634      */
2635     g_kvDelegatePtr = nullptr;
2636     DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
2637     LOGD("delete kv store status %d", status);
2638     ASSERT_TRUE(status == OK);
2639     g_communicatorAggregator->RegOnDispatch(nullptr);
2640 }
2641 
2642 /**
2643   * @tc.name: OrderbyWriteTimeSync001
2644   * @tc.desc: sync query with order by writeTime
2645   * @tc.type: FUNC
2646   * @tc.require: AR000H5VLO
2647   * @tc.author: zhuwentao
2648   */
2649 HWTEST_F(DistributedDBSingleVerP2PSyncTest, OrderbyWriteTimeSync001, TestSize.Level0)
2650 {
2651     /**
2652      * @tc.steps: step1. deviceA subscribe query with order by write time
2653      * * @tc.expected: step1. interface return not support
2654     */
2655     std::vector<std::string> devices;
2656     devices.push_back(g_deviceB->GetDeviceId());
2657     Query query = Query::Select().PrefixKey({'k'}).OrderByWriteTime(true);
2658     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, nullptr, query, true), NOT_SUPPORT);
2659 }
2660 
2661 /**
2662   * @tc.name: EncryptedAlgoUpgrade001
2663   * @tc.desc: Test upgrade encrypted db can sync normally
2664   * @tc.type: FUNC
2665   * @tc.require: AR000HI2JS
2666   * @tc.author: zhuwentao
2667   */
2668 HWTEST_F(DistributedDBSingleVerP2PSyncTest, EncryptedAlgoUpgrade001, TestSize.Level3)
2669 {
2670     /**
2671      * @tc.steps: step1. clear db
2672      * * @tc.expected: step1. interface return ok
2673     */
2674     if (g_kvDelegatePtr != nullptr) {
2675         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
2676         g_kvDelegatePtr = nullptr;
2677         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
2678         LOGD("delete kv store status %d", status);
2679         ASSERT_TRUE(status == OK);
2680     }
2681 
2682     CipherPassword passwd;
2683     std::vector<uint8_t> passwdVect = {'p', 's', 'd', '1'};
2684     passwd.SetValue(passwdVect.data(), passwdVect.size());
2685     /**
2686      * @tc.steps: step2. open old db by sql
2687      * * @tc.expected: step2. interface return ok
2688     */
2689     std::string identifier = DBCommon::GenerateIdentifierId(STORE_ID, APP_ID, USER_ID);
2690     std::string hashDir = DBCommon::TransferHashString(identifier);
2691     std::string hexHashDir = DBCommon::TransferStringToHex(hashDir);
2692     std::string dbPath = g_testDir + "/" + hexHashDir + "/single_ver";
2693     ASSERT_TRUE(DBCommon::CreateDirectory(g_testDir + "/" + hexHashDir) == OK);
2694     ASSERT_TRUE(DBCommon::CreateDirectory(dbPath) == OK);
2695     std::vector<std::string> dbDir {DBConstant::MAINDB_DIR, DBConstant::METADB_DIR, DBConstant::CACHEDB_DIR};
2696     for (const auto &item : dbDir) {
2697         ASSERT_TRUE(DBCommon::CreateDirectory(dbPath + "/" + item) == OK);
2698     }
2699     uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
2700     sqlite3 *db;
2701     std::string fileUrl = dbPath + "/" + DBConstant::MAINDB_DIR + "/" + DBConstant::SINGLE_VER_DATA_STORE + ".db";
2702     ASSERT_TRUE(sqlite3_open_v2(fileUrl.c_str(), &db, flag, nullptr) == SQLITE_OK);
2703     SQLiteUtils::SetKeyInner(db, CipherType::AES_256_GCM, passwd, DBConstant::DEFAULT_ITER_TIMES);
2704     /**
2705      * @tc.steps: step3. create table and close
2706      * * @tc.expected: step3. interface return ok
2707     */
2708     ASSERT_TRUE(SQLiteUtils::ExecuteRawSQL(db, CREATE_SYNC_TABLE_SQL) == E_OK);
2709     sqlite3_close_v2(db);
2710     db = nullptr;
2711     LOGI("create old db success");
2712     /**
2713      * @tc.steps: step4. get kvstore
2714      * * @tc.expected: step4. interface return ok
2715     */
2716     KvStoreNbDelegate::Option option;
2717     option.isEncryptedDb = true;
2718     option.cipher = CipherType::AES_256_GCM;
2719     option.passwd = passwd;
2720     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
2721     ASSERT_TRUE(g_kvDelegateStatus == OK);
2722     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2723     /**
2724      * @tc.steps: step5. sync ok
2725      * * @tc.expected: step5. interface return ok
2726     */
2727     PullSyncTest();
2728     /**
2729      * @tc.steps: step5. crud ok
2730      * * @tc.expected: step5. interface return ok
2731     */
2732     CrudTest();
2733 }
2734 
2735 /**
2736   * @tc.name: RemoveDeviceData002
2737   * @tc.desc: test remove device data before sync
2738   * @tc.type: FUNC
2739   * @tc.require:
2740   * @tc.author: zhuwentao
2741   */
2742 HWTEST_F(DistributedDBSingleVerP2PSyncTest, RemoveDeviceData002, TestSize.Level1)
2743 {
2744     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2745     /**
2746      * @tc.steps: step1. sync deviceB data to A and check data
2747      * * @tc.expected: step1. interface return ok
2748     */
2749     Key key1 = {'1'};
2750     Key key2 = {'2'};
2751     Value value = {'1'};
2752     Timestamp currentTime;
2753     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
2754     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
2755     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
2756     EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
2757     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
2758     Value actualValue;
2759     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
2760     EXPECT_EQ(actualValue, value);
2761     actualValue.clear();
2762     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
2763     EXPECT_EQ(actualValue, value);
2764     /**
2765      * @tc.steps: step2. call RemoveDeviceData
2766      * * @tc.expected: step2. interface return ok
2767     */
2768     g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
2769     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
2770     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
2771     /**
2772      * @tc.steps: step3. sync to device A again and check data
2773      * * @tc.expected: step3. sync ok
2774     */
2775     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
2776     actualValue.clear();
2777     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
2778     EXPECT_EQ(actualValue, value);
2779     actualValue.clear();
2780     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
2781     EXPECT_EQ(actualValue, value);
2782 }
2783 
2784 /**
2785   * @tc.name: DataSync001
2786   * @tc.desc: Test Data Sync when Initialize
2787   * @tc.type: FUNC
2788   * @tc.require: AR000HI2JS
2789   * @tc.author: zhuwentao
2790   */
2791 HWTEST_F(DistributedDBSingleVerP2PSyncTest, DataSync001, TestSize.Level1)
2792 {
2793     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
2794     ASSERT_TRUE(dataSync != nullptr);
2795     std::shared_ptr<Metadata> inMetadata = nullptr;
2796     std::string deviceId;
2797     Message message;
2798     VirtualSingleVerSyncDBInterface tmpInterface;
2799     VirtualCommunicator tmpCommunicator(deviceId, g_communicatorAggregator);
2800     EXPECT_EQ(dataSync->Initialize(nullptr, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
2801     EXPECT_EQ(dataSync->Initialize(&tmpInterface, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
2802     EXPECT_EQ(dataSync->Initialize(&tmpInterface, &tmpCommunicator, inMetadata, deviceId), -E_INVALID_ARGS);
2803     delete dataSync;
2804 }
2805 
2806 /**
2807   * @tc.name: DataSync002
2808   * @tc.desc: Test active sync with invalid param in DataSync Class
2809   * @tc.type: FUNC
2810   * @tc.require: AR000HI2JS
2811   * @tc.author: zhuwentao
2812   */
2813 HWTEST_F(DistributedDBSingleVerP2PSyncTest, DataSync002, TestSize.Level1)
2814 {
2815     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
2816     ASSERT_TRUE(dataSync != nullptr);
2817     Message message;
2818     EXPECT_EQ(dataSync->TryContinueSync(nullptr, &message), -E_INVALID_ARGS);
2819     EXPECT_EQ(dataSync->TryContinueSync(nullptr, nullptr), -E_INVALID_ARGS);
2820     EXPECT_EQ(dataSync->PushStart(nullptr), -E_INVALID_ARGS);
2821     EXPECT_EQ(dataSync->PushPullStart(nullptr), -E_INVALID_ARGS);
2822     EXPECT_EQ(dataSync->PullRequestStart(nullptr), -E_INVALID_ARGS);
2823     EXPECT_EQ(dataSync->PullResponseStart(nullptr), -E_INVALID_ARGS);
2824     delete dataSync;
2825 }
2826 
2827 /**
2828   * @tc.name: DataSync003
2829   * @tc.desc: Test receive invalid request data packet in DataSync Class
2830   * @tc.type: FUNC
2831   * @tc.require: AR000HI2JS
2832   * @tc.author: zhuwentao
2833   */
2834 HWTEST_F(DistributedDBSingleVerP2PSyncTest, DataSync003, TestSize.Level1)
2835 {
2836     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
2837     ASSERT_TRUE(dataSync != nullptr);
2838     uint64_t tmpMark = 0;
2839     Message message;
2840     EXPECT_EQ(dataSync->DataRequestRecv(nullptr, nullptr, tmpMark), -E_INVALID_ARGS);
2841     EXPECT_EQ(dataSync->DataRequestRecv(nullptr, &message, tmpMark), -E_INVALID_ARGS);
2842     delete dataSync;
2843 }
2844 
2845 /**
2846   * @tc.name: DataSync004
2847   * @tc.desc: Test receive invalid ack packet in DataSync Class
2848   * @tc.type: FUNC
2849   * @tc.require: AR000HI2JS
2850   * @tc.author: zhuwentao
2851   */
2852 HWTEST_F(DistributedDBSingleVerP2PSyncTest, DataSync004, TestSize.Level1)
2853 {
2854     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
2855     ASSERT_TRUE(dataSync != nullptr);
2856     Message message;
2857     TestSingleVerKvSyncTaskContext tmpContext;
2858     EXPECT_EQ(dataSync->AckPacketIdCheck(nullptr), false);
2859     EXPECT_EQ(dataSync->AckPacketIdCheck(&message), false);
2860     EXPECT_EQ(dataSync->AckRecv(&tmpContext, nullptr), -E_INVALID_ARGS);
2861     EXPECT_EQ(dataSync->AckRecv(nullptr, nullptr), -E_INVALID_ARGS);
2862     EXPECT_EQ(dataSync->AckRecv(nullptr, &message), -E_INVALID_ARGS);
2863     delete dataSync;
2864 }
2865 
2866 /**
2867   * @tc.name: DataSync005
2868   * @tc.desc: Test receive invalid notify packet in DataSync Class
2869   * @tc.type: FUNC
2870   * @tc.require: AR000HI2JS
2871   * @tc.author: zhuwentao
2872   */
2873 HWTEST_F(DistributedDBSingleVerP2PSyncTest, DataSync005, TestSize.Level1)
2874 {
2875     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
2876     ASSERT_TRUE(dataSync != nullptr);
2877     dataSync->SendSaveDataNotifyPacket(nullptr, 0, 0, 0, TIME_SYNC_MESSAGE);
2878     delete dataSync;
2879 }
2880 
2881 /**
2882   * @tc.name: DataSync006
2883   * @tc.desc: Test control start with invalid param in DataSync Class
2884   * @tc.type: FUNC
2885   * @tc.require: AR000HI2JS
2886   * @tc.author: zhuwentao
2887   */
2888 HWTEST_F(DistributedDBSingleVerP2PSyncTest, DataSync006, TestSize.Level1)
2889 {
2890     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
2891     ASSERT_TRUE(dataSync != nullptr);
2892     TestSingleVerKvSyncTaskContext tmpContext;
2893     EXPECT_EQ(dataSync->ControlCmdStart(nullptr), -E_INVALID_ARGS);
2894     EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
2895     std::shared_ptr<SubscribeManager> subManager = std::make_shared<SubscribeManager>();
2896     tmpContext.SetSubscribeManager(subManager);
2897     tmpContext.SetMode(SyncModeType::INVALID_MODE);
2898     EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
2899     std::set<Key> Keys = {{'a'}, {'b'}};
2900     Query query = Query::Select().InKeys(Keys);
2901     QuerySyncObject innerQuery(query);
2902     tmpContext.SetQuery(innerQuery);
2903     tmpContext.SetMode(SyncModeType::SUBSCRIBE_QUERY);
2904     EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_NOT_SUPPORT);
2905     delete dataSync;
2906     subManager = nullptr;
2907 }
2908 
2909 /**
2910   * @tc.name: DataSync007
2911   * @tc.desc: Test receive invalid control packet in DataSync Class
2912   * @tc.type: FUNC
2913   * @tc.require: AR000HI2JS
2914   * @tc.author: zhuwentao
2915   */
2916 HWTEST_F(DistributedDBSingleVerP2PSyncTest, DataSync007, TestSize.Level1)
2917 {
2918     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
2919     ASSERT_TRUE(dataSync != nullptr);
2920     Message message;
2921     ControlRequestPacket packet;
2922     TestSingleVerKvSyncTaskContext tmpContext;
2923     EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
2924     message.SetCopiedObject(packet);
2925     EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
2926     delete dataSync;
2927 }
2928 
2929 /**
2930   * @tc.name: DataSync008
2931   * @tc.desc: Test pull null msg in dataQueue in DataSync Class
2932   * @tc.type: FUNC
2933   * @tc.require: AR000HI2JS
2934   * @tc.author: zhuwentao
2935   */
2936 HWTEST_F(DistributedDBSingleVerP2PSyncTest, DataSync008, TestSize.Level1)
2937 {
2938     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
2939     ASSERT_TRUE(dataSync != nullptr);
2940     dataSync->PutDataMsg(nullptr);
2941     delete dataSync;
2942 }
2943 
2944 /**
2945  * @tc.name: SyncRetry001
2946  * @tc.desc: use sync retry sync use push
2947  * @tc.type: FUNC
2948  * @tc.require: AR000CKRTD AR000CQE0E
2949  * @tc.author: zhuwentao
2950  */
2951 HWTEST_F(DistributedDBSingleVerP2PSyncTest, SyncRetry001, TestSize.Level3)
2952 {
2953     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
2954     std::vector<std::string> devices;
2955     devices.push_back(g_deviceB->GetDeviceId());
2956 
2957     /**
2958      * @tc.steps: step1. set sync retry
2959      * @tc.expected: step1, Pragma return OK.
2960      */
2961     int pragmaData = 1;
2962     PragmaData input = static_cast<PragmaData>(&pragmaData);
2963     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
2964 
2965     /**
2966      * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
2967      */
2968     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
2969 
2970     /**
2971      * @tc.steps: step3. deviceA call sync and wait
2972      * @tc.expected: step3. sync should return OK.
2973      */
2974     std::map<std::string, DBStatus> result;
2975     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
2976 
2977     /**
2978      * @tc.expected: step4. onComplete should be called, and status is time_out
2979      */
2980     ASSERT_TRUE(result.size() == devices.size());
2981     for (const auto &pair : result) {
2982         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2983         EXPECT_TRUE(pair.second == OK);
2984     }
2985     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
2986 }
2987 
2988 /**
2989  * @tc.name: SyncRetry002
2990  * @tc.desc: use sync retry sync use pull
2991  * @tc.type: FUNC
2992  * @tc.require: AR000CKRTD AR000CQE0E
2993  * @tc.author: zhuwentao
2994  */
2995 HWTEST_F(DistributedDBSingleVerP2PSyncTest, SyncRetry002, TestSize.Level3)
2996 {
2997     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE, 4u);
2998     std::vector<std::string> devices;
2999     devices.push_back(g_deviceB->GetDeviceId());
3000 
3001     /**
3002      * @tc.steps: step1. set sync retry
3003      * @tc.expected: step1, Pragma return OK.
3004      */
3005     int pragmaData = 1;
3006     PragmaData input = static_cast<PragmaData>(&pragmaData);
3007     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
3008 
3009     /**
3010      * @tc.steps: step2. deviceA call sync and wait
3011      * @tc.expected: step2. sync should return OK.
3012      */
3013     std::map<std::string, DBStatus> result;
3014     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
3015 
3016     /**
3017      * @tc.expected: step3. onComplete should be called, and status is time_out
3018      */
3019     ASSERT_TRUE(result.size() == devices.size());
3020     for (const auto &pair : result) {
3021         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
3022         EXPECT_TRUE(pair.second == TIME_OUT);
3023     }
3024     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
3025 }
3026 
3027 /**
3028  * @tc.name: SyncRetry003
3029  * @tc.desc: use sync retry sync use push by compress
3030  * @tc.type: FUNC
3031  * @tc.require: AR000CKRTD AR000CQE0E
3032  * @tc.author: zhuwentao
3033  */
3034 HWTEST_F(DistributedDBSingleVerP2PSyncTest, SyncRetry003, TestSize.Level3)
3035 {
3036     if (g_kvDelegatePtr != nullptr) {
3037         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
3038         g_kvDelegatePtr = nullptr;
3039     }
3040     /**
3041      * @tc.steps: step1. open db use Compress
3042      * @tc.expected: step1, Pragma return OK.
3043      */
3044     KvStoreNbDelegate::Option option;
3045     option.isNeedCompressOnSync = true;
3046     option.compressionRate = 70;
3047     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
3048     ASSERT_TRUE(g_kvDelegateStatus == OK);
3049     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
3050 
3051     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
3052     std::vector<std::string> devices;
3053     devices.push_back(g_deviceB->GetDeviceId());
3054 
3055     /**
3056      * @tc.steps: step2. set sync retry
3057      * @tc.expected: step2, Pragma return OK.
3058      */
3059     int pragmaData = 1;
3060     PragmaData input = static_cast<PragmaData>(&pragmaData);
3061     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
3062 
3063     /**
3064      * @tc.steps: step3. deviceA put {k1, v1}, {k2, v2}
3065      */
3066     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
3067 
3068     /**
3069      * @tc.steps: step4. deviceA call sync and wait
3070      * @tc.expected: step4. sync should return OK.
3071      */
3072     std::map<std::string, DBStatus> result;
3073     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
3074 
3075     /**
3076      * @tc.expected: step5. onComplete should be called, and status is time_out
3077      */
3078     ASSERT_TRUE(result.size() == devices.size());
3079     for (const auto &pair : result) {
3080         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
3081         EXPECT_TRUE(pair.second == OK);
3082     }
3083     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
3084 }
3085 
3086 /**
3087  * @tc.name: SyncRetry004
3088  * @tc.desc: use query sync retry sync use push
3089  * @tc.type: FUNC
3090  * @tc.require: AR000CKRTD AR000CQE0E
3091  * @tc.author: zhuwentao
3092  */
3093 HWTEST_F(DistributedDBSingleVerP2PSyncTest, SyncRetry004, TestSize.Level3)
3094 {
3095     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
3096     std::vector<std::string> devices;
3097     devices.push_back(g_deviceB->GetDeviceId());
3098 
3099     /**
3100      * @tc.steps: step1. set sync retry
3101      * @tc.expected: step1, Pragma return OK.
3102      */
3103     int pragmaData = 1;
3104     PragmaData input = static_cast<PragmaData>(&pragmaData);
3105     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
3106 
3107     /**
3108      * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
3109      */
3110     for (int i = 0; i < 5; i++) {
3111         Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 128); // rand num 1024 for test
3112         Value value;
3113         DistributedDBToolsUnitTest::GetRandomKeyValue(value, 256u);
3114         EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
3115     }
3116 
3117     /**
3118      * @tc.steps: step3. deviceA call sync and wait
3119      * @tc.expected: step3. sync should return OK.
3120      */
3121     std::map<std::string, DBStatus> result;
3122     std::vector<uint8_t> prefixKey({'a', 'b'});
3123     Query query = Query::Select().PrefixKey(prefixKey);
3124     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
3125 
3126     /**
3127      * @tc.expected: step4. onComplete should be called, and status is time_out
3128      */
3129     ASSERT_TRUE(result.size() == devices.size());
3130     for (const auto &pair : result) {
3131         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
3132         EXPECT_TRUE(pair.second == OK);
3133     }
3134     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
3135 }
3136 
3137 /**
3138  * @tc.name: ReSetWatchDogTest001
3139  * @tc.desc: trigger resetWatchDog while pull
3140  * @tc.type: FUNC
3141  * @tc.require: AR000CKRTD AR000CQE0E
3142  * @tc.author: zhuwentao
3143  */
3144 HWTEST_F(DistributedDBSingleVerP2PSyncTest, ReSetWaterDogTest001, TestSize.Level3)
3145 {
3146     /**
3147      * @tc.steps: step1. put 10 key/value
3148      * @tc.expected: step1, put return OK.
3149      */
3150     for (int i = 0; i < 5; i++) {
3151         Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 1024); // rand num 1024 for test
3152         Value value;
3153         DistributedDBToolsUnitTest::GetRandomKeyValue(value, 10 * 50 * 1024u);
3154         EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
3155     }
3156     /**
3157      * @tc.steps: step1. SetDeviceMtuSize
3158      * @tc.expected: step1, return OK.
3159      */
3160     g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 50 * 1024u); // 4k
3161     g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 50 * 1024u); // 4k
3162     /**
3163      * @tc.steps: step2. deviceA,deviceB sync to each other at same time
3164      * @tc.expected: step2. sync should return OK.
3165      */
3166     g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true);
3167     g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 5 * 1024u * 1024u); // 4k
3168     g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 5 * 1024u * 1024u); // 4k
3169 }
3170 
3171 /**
3172   * @tc.name: RebuildSync001
3173   * @tc.desc: rebuild db and sync again
3174   * @tc.type: FUNC
3175   * @tc.require:
3176   * @tc.author: zhuwentao
3177   */
3178 HWTEST_F(DistributedDBSingleVerP2PSyncTest, RebuildSync001, TestSize.Level3)
3179 {
3180     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
3181     /**
3182      * @tc.steps: step1. sync deviceB data to A and check data
3183      * * @tc.expected: step1. interface return ok
3184     */
3185     Key key1 = {'1'};
3186     Key key2 = {'2'};
3187     Value value = {'1'};
3188     Timestamp currentTime;
3189     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
3190     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
3191     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
3192     EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
3193     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
3194 
3195     Value actualValue;
3196     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
3197     EXPECT_EQ(actualValue, value);
3198     actualValue.clear();
3199     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
3200     EXPECT_EQ(actualValue, value);
3201     /**
3202      * @tc.steps: step2. delete db and rebuild
3203      * * @tc.expected: step2. interface return ok
3204     */
3205     g_mgr.CloseKvStore(g_kvDelegatePtr);
3206     g_kvDelegatePtr = nullptr;
3207     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
3208     KvStoreNbDelegate::Option option;
3209     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
3210     ASSERT_TRUE(g_kvDelegateStatus == OK);
3211     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
3212     /**
3213      * @tc.steps: step3. sync to device A again
3214      * * @tc.expected: step3. sync ok
3215     */
3216     value = {'2'};
3217     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
3218     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
3219     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
3220     /**
3221      * @tc.steps: step4. check data in device A
3222      * * @tc.expected: step4. check ok
3223     */
3224     actualValue.clear();
3225     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
3226     EXPECT_EQ(actualValue, value);
3227 }
3228 
3229 /**
3230   * @tc.name: RebuildSync002
3231   * @tc.desc: test clear remote data when receive data
3232   * @tc.type: FUNC
3233   * @tc.require:
3234   * @tc.author: zhuwentao
3235   */
3236 HWTEST_F(DistributedDBSingleVerP2PSyncTest, RebuildSync002, TestSize.Level1)
3237 {
3238     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
3239     std::vector<std::string> devices;
3240     devices.push_back(g_deviceB->GetDeviceId());
3241     /**
3242      * @tc.steps: step1. device A SET_WIPE_POLICY
3243      * * @tc.expected: step1. interface return ok
3244     */
3245     int pragmaData = 2; // 2 means enable
3246     PragmaData input = static_cast<PragmaData>(&pragmaData);
3247     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_WIPE_POLICY, input) == OK);
3248     /**
3249      * @tc.steps: step2. sync deviceB data to A and check data
3250      * * @tc.expected: step2. interface return ok
3251     */
3252     Key key1 = {'1'};
3253     Key key2 = {'2'};
3254     Key key3 = {'3'};
3255     Key key4 = {'4'};
3256     Value value = {'1'};
3257     Timestamp currentTime;
3258     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
3259     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
3260     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
3261     EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
3262     EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
3263     /**
3264      * @tc.steps: step3. deviceA call pull sync
3265      * @tc.expected: step3. sync should return OK.
3266      */
3267     std::map<std::string, DBStatus> result;
3268     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result) == OK);
3269 
3270     /**
3271      * @tc.expected: step4. onComplete should be called, check data
3272      */
3273     ASSERT_TRUE(result.size() == devices.size());
3274     for (const auto &pair : result) {
3275         EXPECT_TRUE(pair.second == OK);
3276     }
3277     Value actualValue;
3278     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
3279     EXPECT_EQ(actualValue, value);
3280     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
3281     EXPECT_EQ(actualValue, value);
3282     /**
3283      * @tc.steps: step5. device B rebuild and put some data
3284      * * @tc.expected: step5. rebuild ok
3285     */
3286     if (g_deviceB != nullptr) {
3287         delete g_deviceB;
3288         g_deviceB = nullptr;
3289     }
3290     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
3291     ASSERT_TRUE(g_deviceB != nullptr);
3292     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
3293     ASSERT_TRUE(syncInterfaceB != nullptr);
3294     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
3295     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
3296     EXPECT_EQ(g_deviceB->PutData(key3, value, currentTime, 0), E_OK);
3297     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
3298     EXPECT_EQ(g_deviceB->PutData(key4, value, currentTime, 0), E_OK);
3299     /**
3300      * @tc.steps: step6. sync to device A again and check data
3301      * * @tc.expected: step6. sync ok
3302     */
3303     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
3304     EXPECT_EQ(g_kvDelegatePtr->Get(key3, actualValue), OK);
3305     EXPECT_EQ(actualValue, value);
3306     EXPECT_EQ(g_kvDelegatePtr->Get(key4, actualValue), OK);
3307     EXPECT_EQ(actualValue, value);
3308     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
3309     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
3310 }
3311 
3312 /**
3313   * @tc.name: RebuildSync003
3314   * @tc.desc: test clear history data when receive ack
3315   * @tc.type: FUNC
3316   * @tc.require:
3317   * @tc.author: zhuwentao
3318   */
3319 HWTEST_F(DistributedDBSingleVerP2PSyncTest, RebuildSync003, TestSize.Level1)
3320 {
3321     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
3322     /**
3323      * @tc.steps: step1. sync deviceB data to A and check data
3324      * * @tc.expected: step1. interface return ok
3325     */
3326     Key key1 = {'1'};
3327     Key key2 = {'2'};
3328     Key key3 = {'3'};
3329     Key key4 = {'4'};
3330     Value value = {'1'};
3331     EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
3332     EXPECT_EQ(g_deviceB->PutData(key2, value, 2u, 0), E_OK); // 2: timestamp
3333     EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
3334     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
3335     Value actualValue;
3336     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
3337     EXPECT_EQ(actualValue, value);
3338     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
3339     EXPECT_EQ(actualValue, value);
3340     VirtualDataItem item;
3341     EXPECT_EQ(g_deviceB->GetData(key3, item), E_OK);
3342     EXPECT_EQ(item.value, value);
3343     /**
3344      * @tc.steps: step2. device B sync to device A,but make it failed
3345      * * @tc.expected: step2. interface return ok
3346     */
3347     EXPECT_EQ(g_deviceB->PutData(key4, value, 3u, 0), E_OK); // 3: timestamp
3348     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_A, DATA_SYNC_MESSAGE);
3349     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
3350     /**
3351      * @tc.steps: step3. device B set delay send time
3352      * * @tc.expected: step3. interface return ok
3353     */
3354     std::set<std::string> delayDevice = {DEVICE_B};
3355     g_communicatorAggregator->SetSendDelayInfo(3000u, DATA_SYNC_MESSAGE, 1u, 0u, delayDevice); // delay 3000ms one time
3356     /**
3357      * @tc.steps: step4. device A rebuilt, device B push data to A and set clear remote data mark into context after 1s
3358      * * @tc.expected: step4. interface return ok
3359     */
__anone9df75cb2202() 3360     std::thread thread1([]() {
3361         std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // wait 1s
3362         g_deviceB->SetClearRemoteStaleData(true);
3363         if (g_kvDelegatePtr != nullptr) {
3364             g_mgr.CloseKvStore(g_kvDelegatePtr);
3365             g_kvDelegatePtr = nullptr;
3366         }
3367         ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
3368         KvStoreNbDelegate::Option option;
3369         g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
3370         ASSERT_TRUE(g_kvDelegateStatus == OK);
3371         ASSERT_TRUE(g_kvDelegatePtr != nullptr);
3372         std::map<std::string, DBStatus> result;
3373         std::vector<std::string> devices = {g_deviceB->GetDeviceId()};
3374         g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
3375         ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
3376     });
3377     /**
3378      * @tc.steps: step5. device B sync to A, make it clear history data and check data
3379      * * @tc.expected: step5. interface return ok
3380     */
3381     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
3382     thread1.join();
3383     EXPECT_EQ(g_deviceB->GetData(key3, item), -E_NOT_FOUND);
3384     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
3385     EXPECT_EQ(actualValue, value);
3386     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
3387     EXPECT_EQ(actualValue, value);
3388     g_communicatorAggregator->ResetSendDelayInfo();
3389 }
3390 
3391 /**
3392   * @tc.name: GetSyncDataFail001
3393   * @tc.desc: test get sync data failed when sync
3394   * @tc.type: FUNC
3395   * @tc.require:
3396   * @tc.author: zhuwentao
3397   */
3398 HWTEST_F(DistributedDBSingleVerP2PSyncTest, GetSyncDataFail001, TestSize.Level1)
3399 {
3400     /**
3401      * @tc.steps: step1. device B set get data errCode control and put some data
3402      * * @tc.expected: step1. interface return ok
3403     */
3404     g_deviceB->SetGetDataErrCode(1, -E_BUSY, true);
3405     Key key1 = {'1'};
3406     Value value = {'1'};
3407     EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
3408     /**
3409      * @tc.steps: step2. device B sync to device A and check data
3410      * * @tc.expected: step2. interface return ok
3411     */
3412     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
3413     Value actualValue;
3414     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
3415     g_deviceB->ResetDataControl();
3416 }