• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 
18 #include "ability_sync.h"
19 #include "distributeddb_data_generate_unit_test.h"
20 #include "distributeddb_tools_unit_test.h"
21 #include "kv_store_nb_delegate_impl.h"
22 #include "kv_virtual_device.h"
23 #include "platform_specific.h"
24 #include "process_system_api_adapter_impl.h"
25 #include "single_ver_data_packet.h"
26 #include "virtual_communicator_aggregator.h"
27 
28 using namespace testing::ext;
29 using namespace DistributedDB;
30 using namespace DistributedDBUnitTest;
31 using namespace std;
32 
33 namespace {
34     string g_testDir;
35     const string STORE_ID = "kv_stroe_sync_check_test";
36     const std::string DEVICE_B = "deviceB";
37     const std::string DEVICE_C = "deviceC";
38     const int LOCAL_WATER_MARK_NOT_INIT = 0xaa;
39     const int EIGHT_HUNDRED = 800;
40     const int NORMAL_SYNC_SEND_REQUEST_CNT = 3;
41     const int TWO_CNT = 2;
42     const int SLEEP_MILLISECONDS = 500;
43     const int TEN_SECONDS = 10;
44     const int THREE_HUNDRED = 300;
45     const int WAIT_30_SECONDS = 30000;
46     const int WAIT_40_SECONDS = 40000;
47     const int TIMEOUT_6_SECONDS = 6000;
48 
49     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
50     KvStoreConfig g_config;
51     DistributedDBToolsUnitTest g_tool;
52     DBStatus g_kvDelegateStatus = INVALID_ARGS;
53     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
54     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
55     KvVirtualDevice* g_deviceB = nullptr;
56     KvVirtualDevice* g_deviceC = nullptr;
57     VirtualSingleVerSyncDBInterface *g_syncInterfaceB = nullptr;
58     VirtualSingleVerSyncDBInterface *g_syncInterfaceC = nullptr;
59 
60     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
61     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
62         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
63 #ifndef LOW_LEVEL_MEM_DEV
64     const int KEY_LEN = 20; // 20 Bytes
65     const int VALUE_LEN = 4 * 1024 * 1024; // 4MB
66     const int ENTRY_NUM = 2; // 16 entries
67 #endif
68 
69 class DistributedDBSingleVerP2PSyncCheckTest : public testing::Test {
70 public:
71     static void SetUpTestCase(void);
72     static void TearDownTestCase(void);
73     void SetUp();
74     void TearDown();
75 };
76 
SetUpTestCase(void)77 void DistributedDBSingleVerP2PSyncCheckTest::SetUpTestCase(void)
78 {
79     /**
80      * @tc.setup: Init datadir and Virtual Communicator.
81      */
82     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
83     g_config.dataDir = g_testDir;
84     g_mgr.SetKvStoreConfig(g_config);
85 
86     string dir = g_testDir + "/single_ver";
87     DIR* dirTmp = opendir(dir.c_str());
88     if (dirTmp == nullptr) {
89         OS::MakeDBDirectory(dir);
90     } else {
91         closedir(dirTmp);
92     }
93 
94     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
95     ASSERT_TRUE(g_communicatorAggregator != nullptr);
96     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
97 
98     std::shared_ptr<ProcessSystemApiAdapterImpl> g_adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
99     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(g_adapter);
100 }
101 
TearDownTestCase(void)102 void DistributedDBSingleVerP2PSyncCheckTest::TearDownTestCase(void)
103 {
104     /**
105      * @tc.teardown: Release virtual Communicator and clear data dir.
106      */
107     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
108         LOGE("rm test db files error!");
109     }
110     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
111     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
112 }
113 
SetUp(void)114 void DistributedDBSingleVerP2PSyncCheckTest::SetUp(void)
115 {
116     DistributedDBToolsUnitTest::PrintTestCaseInfo();
117     /**
118      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
119      */
120     KvStoreNbDelegate::Option option;
121     option.secOption.securityLabel = SecurityLabel::S3;
122     option.secOption.securityFlag = SecurityFlag::SECE;
123     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
124     ASSERT_TRUE(g_kvDelegateStatus == OK);
125     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
126     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
127     ASSERT_TRUE(g_deviceB != nullptr);
128     g_syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
129     ASSERT_TRUE(g_syncInterfaceB != nullptr);
130     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, g_syncInterfaceB), E_OK);
131     SecurityOption virtualOption;
132     virtualOption.securityLabel = option.secOption.securityLabel;
133     virtualOption.securityFlag = option.secOption.securityFlag;
134     g_syncInterfaceB->SetSecurityOption(virtualOption);
135 
136     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
137     ASSERT_TRUE(g_deviceC != nullptr);
138     g_syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
139     ASSERT_TRUE(g_syncInterfaceC != nullptr);
140     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, g_syncInterfaceC), E_OK);
141     g_syncInterfaceC->SetSecurityOption(virtualOption);
142     RuntimeContext::GetInstance()->ClearAllDeviceTimeInfo();
143 }
144 
TearDown(void)145 void DistributedDBSingleVerP2PSyncCheckTest::TearDown(void)
146 {
147     /**
148      * @tc.teardown: Release device A, B, C
149      */
150     if (g_kvDelegatePtr != nullptr) {
151         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
152         g_kvDelegatePtr = nullptr;
153         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
154         LOGD("delete kv store status %d", status);
155         ASSERT_TRUE(status == OK);
156     }
157     if (g_deviceB != nullptr) {
158         delete g_deviceB;
159         g_deviceB = nullptr;
160     }
161     if (g_deviceC != nullptr) {
162         delete g_deviceC;
163         g_deviceC = nullptr;
164     }
165     if (g_communicatorAggregator != nullptr) {
166         g_communicatorAggregator->RegOnDispatch(nullptr);
167     }
168 }
169 
170 /**
171  * @tc.name: sec option check Sync 001
172  * @tc.desc: if sec option not equal, forbid sync
173  * @tc.type: FUNC
174  * @tc.require: AR000EV1G6
175  * @tc.author: wangchuanqing
176  */
177 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck001, TestSize.Level1)
178 {
179     DBStatus status = OK;
180     std::vector<std::string> devices;
181     devices.push_back(g_deviceB->GetDeviceId());
182     devices.push_back(g_deviceC->GetDeviceId());
183 
184     /**
185      * @tc.steps: step1. deviceA put {k1, v1}
186      */
187     Key key = {'1'};
188     Value value = {'1'};
189     status = g_kvDelegatePtr->Put(key, value);
190     ASSERT_TRUE(status == OK);
191 
192     ASSERT_TRUE(g_syncInterfaceB != nullptr);
193     ASSERT_TRUE(g_syncInterfaceC != nullptr);
194     SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
195     g_syncInterfaceB->SetSecurityOption(secOption);
196     g_syncInterfaceC->SetSecurityOption(secOption);
197 
198     /**
199      * @tc.steps: step2. deviceA call sync and wait
200      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
201      */
202     std::map<std::string, DBStatus> result;
203     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
204     ASSERT_TRUE(status == OK);
205 
206     ASSERT_TRUE(result.size() == devices.size());
207     for (const auto &pair : result) {
208         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
209         EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
210     }
211     VirtualDataItem item;
212     g_deviceB->GetData(key, item);
213     EXPECT_TRUE(item.value.empty());
214     g_deviceC->GetData(key, item);
215     EXPECT_TRUE(item.value.empty());
216 }
217 
218 /**
219  * @tc.name: sec option check Sync 002
220  * @tc.desc: if sec option not equal, forbid sync
221  * @tc.type: FUNC
222  * @tc.require: AR000EV1G6
223  * @tc.author: wangchuanqing
224  */
225 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck002, TestSize.Level1)
226 {
227     DBStatus status = OK;
228     std::vector<std::string> devices;
229     devices.push_back(g_deviceB->GetDeviceId());
230     devices.push_back(g_deviceC->GetDeviceId());
231 
232     /**
233      * @tc.steps: step1. deviceA put {k1, v1}
234      */
235     Key key = {'1'};
236     Value value = {'1'};
237     status = g_kvDelegatePtr->Put(key, value);
238     ASSERT_TRUE(status == OK);
239 
240     ASSERT_TRUE(g_syncInterfaceC != nullptr);
241     SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
242     g_syncInterfaceC->SetSecurityOption(secOption);
243     secOption.securityLabel = SecurityLabel::S3;
244     secOption.securityFlag = SecurityFlag::SECE;
245     g_syncInterfaceB->SetSecurityOption(secOption);
246 
247     /**
248      * @tc.steps: step2. deviceA call sync and wait
249      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
250      */
251     std::map<std::string, DBStatus> result;
252     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
253     ASSERT_TRUE(status == OK);
254 
255     ASSERT_TRUE(result.size() == devices.size());
256     for (const auto &pair : result) {
257         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
258         if (pair.first == DEVICE_B) {
259             EXPECT_TRUE(pair.second == OK);
260         } else {
261             EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
262         }
263     }
264     VirtualDataItem item;
265     g_deviceC->GetData(key, item);
266     EXPECT_TRUE(item.value.empty());
267     g_deviceB->GetData(key, item);
268     EXPECT_TRUE(item.value == value);
269 }
270 
271 /**
272  * @tc.name: sec option check Sync 003
273  * @tc.desc: if sec option equal, check not pass, forbid sync
274  * @tc.type: FUNC
275  * @tc.require: AR000EV1G6
276  * @tc.author: zhangqiquan
277  */
278 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck003, TestSize.Level1)
279 {
280     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
281     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon728505270202(const std::string &, const SecurityOption &) 282     adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
283         return false;
284     });
285     /**
286      * @tc.steps: step1. record packet
287      * @tc.expected: step1. sync should failed in source.
288      */
289     std::atomic<int> messageCount = 0;
__anon728505270302(const std::string &, Message *) 290     g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &, Message *) {
291         messageCount++;
292     });
293     /**
294      * @tc.steps: step2. deviceA call sync and wait
295      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
296      */
297     DBStatus status = OK;
298     std::vector<std::string> devices;
299     devices.push_back(g_deviceB->GetDeviceId());
300     std::map<std::string, DBStatus> result;
301     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
302     EXPECT_EQ(status, OK);
303     EXPECT_EQ(messageCount, 4); // 4 = 2 time sync + 2 ability sync
304     for (const auto &pair : result) {
305         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
306         EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
307     }
308     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
309     g_communicatorAggregator->RegOnDispatch(nullptr);
310 }
311 
312 /**
313  * @tc.name: sec option check Sync 004
314  * @tc.desc: memory db not check device security
315  * @tc.type: FUNC
316  * @tc.require:
317  * @tc.author: zhangqiquan
318  */
319 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck004, TestSize.Level1)
320 {
321     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
322     g_kvDelegatePtr = nullptr;
323     KvStoreNbDelegate::Option option;
324     option.secOption.securityLabel = SecurityLabel::NOT_SET;
325     option.isMemoryDb = true;
326     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
327     ASSERT_TRUE(g_kvDelegateStatus == OK);
328     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
329 
330     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
331     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon728505270402(const std::string &, const SecurityOption &) 332     adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
333         return false;
334     });
__anon728505270502(const std::string &, SecurityOption &securityOption) 335     adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
336         securityOption.securityLabel = NOT_SET;
337         return OK;
338     });
__anon728505270602(SecurityOption &) 339     g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &) {
340         return -E_NOT_SUPPORT;
341     });
342 
343     std::vector<std::string> devices;
344     devices.push_back(g_deviceB->GetDeviceId());
345     std::map<std::string, DBStatus> result;
346     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
347     EXPECT_EQ(status, OK);
348     for (const auto &pair : result) {
349         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
350         EXPECT_TRUE(pair.second == OK);
351     }
352 
353     adapter->ForkCheckDeviceSecurityAbility(nullptr);
354     adapter->ForkGetSecurityOption(nullptr);
355     g_syncInterfaceB->ForkGetSecurityOption(nullptr);
356 }
357 
358 /**
359  * @tc.name: sec option check Sync 005
360  * @tc.desc: if sec option equal, check not pass, forbid sync
361  * @tc.type: FUNC
362  * @tc.require:
363  * @tc.author: zhangqiquan
364  */
365 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck005, TestSize.Level1)
366 {
367     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
368     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon728505270702(SecurityOption &option) 369     g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
370         option.securityLabel = NOT_SET;
371         return E_OK;
372     });
__anon728505270802(const std::string &, SecurityOption &securityOption) 373     adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
374         securityOption.securityLabel = NOT_SET;
375         return OK;
376     });
377 
378     std::vector<std::string> devices;
379     devices.push_back(g_deviceB->GetDeviceId());
380     std::map<std::string, DBStatus> result;
381     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
382     EXPECT_EQ(status, OK);
383     for (const auto &pair : result) {
384         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
385         EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
386     }
387 
388     adapter->ForkCheckDeviceSecurityAbility(nullptr);
389     adapter->ForkGetSecurityOption(nullptr);
390     g_syncInterfaceB->ForkGetSecurityOption(nullptr);
391 }
392 
393 /**
394  * @tc.name: sec option check Sync 006
395  * @tc.desc: memory db not check device security
396  * @tc.type: FUNC
397  * @tc.require:
398  * @tc.author: zhangqiquan
399  */
400 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck006, TestSize.Level0)
401 {
402     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
403     ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID), OK);
404     g_kvDelegatePtr = nullptr;
405     KvStoreNbDelegate::Option option;
406     option.secOption.securityLabel = SecurityLabel::S1;
407     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
408     ASSERT_TRUE(g_kvDelegateStatus == OK);
409     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
410 
411     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
412     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon728505270902(const std::string &, const SecurityOption &) 413     adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
414         return true;
415     });
__anon728505270a02(const std::string &, SecurityOption &securityOption) 416     adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
417         securityOption.securityLabel = S1;
418         return OK;
419     });
__anon728505270b02(SecurityOption &option) 420     g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
421         option.securityLabel = SecurityLabel::S0;
422         return E_OK;
423     });
424 
425     std::vector<std::string> devices;
426     devices.push_back(g_deviceB->GetDeviceId());
427     std::map<std::string, DBStatus> result;
428     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
429     EXPECT_EQ(status, OK);
430     for (const auto &pair : result) {
431         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
432         EXPECT_TRUE(pair.second == OK);
433     }
434 
435     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(std::make_shared<ProcessSystemApiAdapterImpl>());
436     g_syncInterfaceB->ForkGetSecurityOption(nullptr);
437 }
438 
439 /**
440  * @tc.name: sec option check Sync 007
441  * @tc.desc: sync should send security option
442  * @tc.type: FUNC
443  * @tc.require:
444  * @tc.author: zhangqiquan
445  */
446 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck007, TestSize.Level0)
447 {
448     /**
449      * @tc.steps: step1. fork check device security ability
450      * @tc.expected: step1. check param option should be S3 SECE.
451      */
452     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
453     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon728505270c02(const std::string &, const SecurityOption &option) 454     adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &option) {
455         EXPECT_EQ(option.securityLabel, SecurityLabel::S3);
456         EXPECT_EQ(option.securityFlag, SecurityFlag::SECE);
457         return true;
458     });
459     /**
460      * @tc.steps: step2. sync twice
461      * @tc.expected: step2. sync success.
462      */
463     std::vector<std::string> devices;
464     devices.push_back(g_deviceB->GetDeviceId());
465     std::map<std::string, DBStatus> result;
466     g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
467     auto status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
468     ASSERT_TRUE(status == OK);
469     ASSERT_TRUE(result.size() == devices.size());
470     for (const auto &pair : result) {
471         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
472         EXPECT_TRUE(pair.second == OK);
473     }
474     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
475 }
476 
477 /**
478  * @tc.name: SecOptionCheck008
479  * @tc.desc: pull compress sync when check device ability fail
480  * @tc.type: FUNC
481  * @tc.require:
482  * @tc.author: zhangqiquan
483  */
484 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck008, TestSize.Level0)
485 {
486     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
487     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
488     auto deviceB = g_deviceB->GetDeviceId();
__anon728505270d02(const std::string &dev, const SecurityOption &) 489     adapter->ForkCheckDeviceSecurityAbility([deviceB](const std::string &dev, const SecurityOption &) {
490         if (dev != "real_device") {
491             return true;
492         }
493         return false;
494     });
__anon728505270e02(SecurityOption &option) 495     g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
496         option.securityLabel = SecurityLabel::S3;
497         option.securityFlag = SecurityFlag::SECE;
498         return E_OK;
499     });
500     g_syncInterfaceB->SetCompressSync(true);
501     std::vector<std::string> devices;
502     devices.push_back(deviceB);
503     std::map<std::string, DBStatus> result;
504     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
505     EXPECT_EQ(status, OK);
506     for (const auto &pair : result) {
507         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
508         EXPECT_EQ(pair.second, SECURITY_OPTION_CHECK_ERROR);
509     }
510 
511     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(std::make_shared<ProcessSystemApiAdapterImpl>());
512     g_syncInterfaceB->ForkGetSecurityOption(nullptr);
513     g_syncInterfaceB->SetCompressSync(false);
514 }
515 
516 #ifndef LOW_LEVEL_MEM_DEV
517 /**
518  * @tc.name: BigDataSync001
519  * @tc.desc: big data sync push mode.
520  * @tc.type: FUNC
521  * @tc.require: AR000F3OOU
522  * @tc.author: wangchuanqing
523  */
524 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync001, TestSize.Level1)
525 {
526     DBStatus status = OK;
527     std::vector<std::string> devices;
528     devices.push_back(g_deviceB->GetDeviceId());
529     devices.push_back(g_deviceC->GetDeviceId());
530 
531     /**
532      * @tc.steps: step1. deviceA put 16 bigData
533      */
534     std::vector<Entry> entries;
535     std::vector<Key> keys;
536     DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
537     for (const auto &entry : entries) {
538         status = g_kvDelegatePtr->Put(entry.key, entry.value);
539         ASSERT_TRUE(status == OK);
540     }
541 
542     /**
543      * @tc.steps: step2. deviceA call sync and wait
544      * @tc.expected: step2. sync should return OK.
545      */
546     std::map<std::string, DBStatus> result;
547     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
548     ASSERT_TRUE(status == OK);
549 
550     /**
551      * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
552      */
553     ASSERT_TRUE(result.size() == devices.size());
554     for (const auto &pair : result) {
555         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
556         EXPECT_TRUE(pair.second == OK);
557     }
558     VirtualDataItem item;
559     for (const auto &entry : entries) {
560         item.value.clear();
561         g_deviceB->GetData(entry.key, item);
562         EXPECT_TRUE(item.value == entry.value);
563         item.value.clear();
564         g_deviceC->GetData(entry.key, item);
565         EXPECT_TRUE(item.value == entry.value);
566     }
567 }
568 
569 /**
570  * @tc.name: BigDataSync002
571  * @tc.desc: big data sync pull mode.
572  * @tc.type: FUNC
573  * @tc.require: AR000F3OOU
574  * @tc.author: wangchuanqing
575  */
576 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync002, TestSize.Level1)
577 {
578     DBStatus status = OK;
579     std::vector<std::string> devices;
580     devices.push_back(g_deviceB->GetDeviceId());
581     devices.push_back(g_deviceC->GetDeviceId());
582 
583     /**
584      * @tc.steps: step1. deviceA deviceB put bigData
585      */
586     std::vector<Entry> entries;
587     std::vector<Key> keys;
588     DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
589 
590     for (uint32_t i = 0; i < entries.size(); i++) {
591         if (i % 2 == 0) {
592             g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
593         } else {
594             g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
595         }
596     }
597 
598     /**
599      * @tc.steps: step3. deviceA call pull sync
600      * @tc.expected: step3. sync should return OK.
601      */
602     std::map<std::string, DBStatus> result;
603     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
604     ASSERT_TRUE(status == OK);
605 
606     /**
607      * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
608      */
609     ASSERT_TRUE(result.size() == devices.size());
610     for (const auto &pair : result) {
611         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
612         EXPECT_TRUE(pair.second == OK);
613     }
614     for (const auto &entry : entries) {
615         Value value;
616         EXPECT_EQ(g_kvDelegatePtr->Get(entry.key, value), OK);
617         EXPECT_EQ(value, entry.value);
618     }
619 }
620 
621 /**
622  * @tc.name: BigDataSync003
623  * @tc.desc: big data sync pushAndPull mode.
624  * @tc.type: FUNC
625  * @tc.require: AR000F3OOV
626  * @tc.author: wangchuanqing
627  */
628 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync003, TestSize.Level1)
629 {
630     DBStatus status = OK;
631     std::vector<std::string> devices;
632     devices.push_back(g_deviceB->GetDeviceId());
633     devices.push_back(g_deviceC->GetDeviceId());
634 
635     /**
636      * @tc.steps: step1. deviceA deviceB put bigData
637      */
638     std::vector<Entry> entries;
639     std::vector<Key> keys;
640     DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
641 
642     for (uint32_t i = 0; i < entries.size(); i++) {
643         if (i % 3 == 0) { // 0 3 6 9 12 15 for deivec B
644             g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
645         } else if (i % 3 == 1) { // 1 4 7 10 13 16 for device C
646             g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
647         } else { // 2 5 8 11 14 for device A
648             status = g_kvDelegatePtr->Put(entries[i].key, entries[i].value);
649             ASSERT_TRUE(status == OK);
650         }
651     }
652 
653     /**
654      * @tc.steps: step3. deviceA call pushAndpull sync
655      * @tc.expected: step3. sync should return OK.
656      */
657     std::map<std::string, DBStatus> result;
658     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
659     ASSERT_TRUE(status == OK);
660 
661     /**
662      * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
663      * deviceB and deviceC has deviceA data
664      */
665     ASSERT_TRUE(result.size() == devices.size());
666     for (const auto &pair : result) {
667         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
668         EXPECT_TRUE(pair.second == OK);
669     }
670 
671     VirtualDataItem item;
672     for (uint32_t i = 0; i < entries.size(); i++) {
673         Value value;
674         EXPECT_EQ(g_kvDelegatePtr->Get(entries[i].key, value), OK);
675         EXPECT_EQ(value, entries[i].value);
676 
677         if (i % 3 == 2) { // 2 5 8 11 14 for device A
678         item.value.clear();
679         g_deviceB->GetData(entries[i].key, item);
680         EXPECT_TRUE(item.value == entries[i].value);
681         item.value.clear();
682         g_deviceC->GetData(entries[i].key, item);
683         EXPECT_TRUE(item.value == entries[i].value);
684         }
685     }
686 }
687 #endif
688 
689 /**
690  * @tc.name: PushFinishedNotify 001
691  * @tc.desc: Test remote device push finished notify function.
692  * @tc.type: FUNC
693  * @tc.require: AR000CQS3S
694  * @tc.author: xushaohua
695  */
696 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, PushFinishedNotify001, TestSize.Level1)
697 {
698     std::vector<std::string> devices;
699     devices.push_back(g_deviceB->GetDeviceId());
700 
701     /**
702      * @tc.steps: step1. deviceA call SetRemotePushFinishedNotify
703      * @tc.expected: step1. set should return OK.
704      */
705     int pushfinishedFlag = 0;
706     DBStatus status = g_kvDelegatePtr->SetRemotePushFinishedNotify(
__anon728505270f02(const RemotePushNotifyInfo &info) 707         [&pushfinishedFlag](const RemotePushNotifyInfo &info) {
708             EXPECT_TRUE(info.deviceId == DEVICE_B);
709             pushfinishedFlag = 1;
710     });
711     ASSERT_EQ(status, OK);
712 
713     /**
714      * @tc.steps: step2. deviceB put k2, v2, and deviceA pull from deviceB
715      * @tc.expected: step2. deviceA can not receive push finished notify
716      */
717     EXPECT_EQ(g_kvDelegatePtr->Put(KEY_2, VALUE_2), OK);
718     std::map<std::string, DBStatus> result;
719     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
720     EXPECT_TRUE(status == OK);
721     EXPECT_EQ(pushfinishedFlag, 0);
722     pushfinishedFlag = 0;
723 
724     /**
725      * @tc.steps: step3. deviceB put k3, v3, and deviceA push and pull to deviceB
726      * @tc.expected: step3. deviceA can not receive push finished notify
727      */
728     EXPECT_EQ(g_kvDelegatePtr->Put(KEY_3, VALUE_3), OK);
729     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
730     EXPECT_TRUE(status == OK);
731     EXPECT_EQ(pushfinishedFlag, 0);
732     pushfinishedFlag = 0;
733 
734     /**
735      * @tc.steps: step4. deviceA call SetRemotePushFinishedNotify to reset notify
736      * @tc.expected: step4. set should return OK.
737      */
__anon728505271002(const RemotePushNotifyInfo &info) 738     status = g_kvDelegatePtr->SetRemotePushFinishedNotify([&pushfinishedFlag](const RemotePushNotifyInfo &info) {
739         EXPECT_TRUE(info.deviceId == DEVICE_B);
740         pushfinishedFlag = 2;
741     });
742     ASSERT_EQ(status, OK);
743 
744     /**
745      * @tc.steps: step5. deviceA call SetRemotePushFinishedNotify set null to unregist
746      * @tc.expected: step5. set should return OK.
747      */
748     status = g_kvDelegatePtr->SetRemotePushFinishedNotify(nullptr);
749     ASSERT_EQ(status, OK);
750 }
751 
752 namespace {
RegOnDispatchWithDelayAck(bool & errCodeAck,bool & afterErrAck)753 void RegOnDispatchWithDelayAck(bool &errCodeAck, bool &afterErrAck)
754 {
755     // just delay the busy ack
756     g_communicatorAggregator->RegOnDispatch([&errCodeAck, &afterErrAck](const std::string &dev, Message *inMsg) {
757         if (dev != g_deviceB->GetDeviceId()) {
758             return;
759         }
760         auto *packet = inMsg->GetObject<DataAckPacket>();
761         if (packet != nullptr && packet->GetRecvCode() == -E_BUSY) {
762             errCodeAck = true;
763             while (!afterErrAck) {
764             }
765             LOGW("NOW SEND BUSY ACK");
766         } else if (errCodeAck) {
767             afterErrAck = true;
768             std::this_thread::sleep_for(std::chrono::seconds(1));
769         }
770     });
771 }
772 
RegOnDispatchWithOffline(bool & offlineFlag,bool & invalid,condition_variable & conditionOffline)773 void RegOnDispatchWithOffline(bool &offlineFlag, bool &invalid, condition_variable &conditionOffline)
774 {
775     g_communicatorAggregator->RegOnDispatch([&offlineFlag, &invalid, &conditionOffline](
776                                                 const std::string &dev, Message *inMsg) {
777         auto *packet = inMsg->GetObject<DataAckPacket>();
778         if (dev != DEVICE_B) {
779             if (packet != nullptr && (packet->GetRecvCode() == LOCAL_WATER_MARK_NOT_INIT)) {
780                 offlineFlag = true;
781                 conditionOffline.notify_all();
782                 LOGW("[Dispatch] NOTIFY OFFLINE");
783                 std::this_thread::sleep_for(std::chrono::microseconds(EIGHT_HUNDRED));
784             }
785         } else if (!invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
786             LOGW("[Dispatch] NOW INVALID THIS MSG");
787             inMsg->SetMessageType(TYPE_INVALID);
788             inMsg->SetMessageId(INVALID_MESSAGE_ID);
789             invalid = true;
790         }
791     });
792 }
793 
RegOnDispatchWithInvalidMsg(bool & invalid)794 void RegOnDispatchWithInvalidMsg(bool &invalid)
795 {
796     g_communicatorAggregator->RegOnDispatch([&invalid](
797         const std::string &dev, Message *inMsg) {
798         if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
799             LOGW("[Dispatch] NOW INVALID THIS MSG");
800             inMsg->SetMessageType(TYPE_INVALID);
801             inMsg->SetMessageId(INVALID_MESSAGE_ID);
802             invalid = true;
803         }
804     });
805 }
806 
PrepareEnv(vector<std::string> & devices,Key & key,Query & query)807 void PrepareEnv(vector<std::string> &devices, Key &key, Query &query)
808 {
809     /**
810      * @tc.steps: step1. ensure the watermark is no zero and finish timeSync and abilitySync
811      * @tc.expected: step1. should return OK.
812      */
813     Value value = {'1'};
814     std::map<std::string, DBStatus> result;
815     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
816 
817     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
818     EXPECT_TRUE(status == OK);
819     ASSERT_TRUE(result[g_deviceB->GetDeviceId()] == OK);
820 }
821 
Sync(KvStoreNbDelegate * kvDelegatePtr,vector<std::string> & devices,const DBStatus & targetStatus)822 void Sync(KvStoreNbDelegate *kvDelegatePtr, vector<std::string> &devices, const DBStatus &targetStatus)
823 {
824     std::map<std::string, DBStatus> result;
825     DBStatus status = g_tool.SyncTest(kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result);
826     EXPECT_TRUE(status == OK);
827     for (const auto &deviceId : devices) {
828         ASSERT_TRUE(result[deviceId] == targetStatus);
829     }
830 }
831 
Sync(vector<std::string> & devices,const DBStatus & targetStatus)832 void Sync(vector<std::string> &devices, const DBStatus &targetStatus)
833 {
834     Sync(g_kvDelegatePtr, devices, targetStatus);
835 }
836 
SyncWithQuery(vector<std::string> & devices,const Query & query,const SyncMode & mode,const DBStatus & targetStatus)837 void SyncWithQuery(vector<std::string> &devices, const Query &query, const SyncMode &mode,
838     const DBStatus &targetStatus)
839 {
840     std::map<std::string, DBStatus> result;
841     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result, query);
842     EXPECT_TRUE(status == OK);
843     for (const auto &deviceId : devices) {
844         ASSERT_EQ(result[deviceId], targetStatus);
845     }
846 }
847 
SyncWithQuery(vector<std::string> & devices,const Query & query,const DBStatus & targetStatus)848 void SyncWithQuery(vector<std::string> &devices, const Query &query, const DBStatus &targetStatus)
849 {
850     SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PUSH_ONLY, targetStatus);
851 }
852 
SyncWithDeviceOffline(vector<std::string> & devices,Key & key,const Query & query)853 void SyncWithDeviceOffline(vector<std::string> &devices, Key &key, const Query &query)
854 {
855     Value value = {'2'};
856     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
857 
858     /**
859      * @tc.steps: step2. invalid the sync msg
860      * @tc.expected: step2. should return TIME_OUT.
861      */
862     SyncWithQuery(devices, query, TIME_OUT);
863 
864     /**
865      * @tc.steps: step3. device offline when sync
866      * @tc.expected: step3. should return COMM_FAILURE.
867      */
868     SyncWithQuery(devices, query, COMM_FAILURE);
869 }
870 
PrepareWaterMarkError(std::vector<std::string> & devices,Query & query)871 void PrepareWaterMarkError(std::vector<std::string> &devices, Query &query)
872 {
873     /**
874      * @tc.steps: step1. prepare data
875      */
876     devices.push_back(g_deviceB->GetDeviceId());
877     g_deviceB->Online();
878 
879     Key key = {'1'};
880     query = Query::Select().PrefixKey(key);
881     PrepareEnv(devices, key, query);
882 
883     /**
884      * @tc.steps: step2. query sync and set queryWaterMark
885      * @tc.expected: step2. should return OK.
886      */
887     Value value = {'2'};
888     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
889     SyncWithQuery(devices, query, OK);
890 
891     /**
892      * @tc.steps: step3. sync and invalid msg for set local device waterMark
893      * @tc.expected: step3. should return TIME_OUT.
894      */
895     bool invalidMsg = false;
896     RegOnDispatchWithInvalidMsg(invalidMsg);
897     value = {'3'};
898     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
899     Sync(devices, TIME_OUT);
900     g_communicatorAggregator->RegOnDispatch(nullptr);
901 }
902 
RegOnDispatchWithoutDataPacket(std::atomic<int> & messageCount,bool calResponse=false)903 void RegOnDispatchWithoutDataPacket(std::atomic<int> &messageCount, bool calResponse = false)
904 {
905     g_communicatorAggregator->RegOnDispatch([calResponse, &messageCount](const std::string &dev, Message *msg) {
906         if (msg->GetMessageId() != TIME_SYNC_MESSAGE && msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
907             return;
908         }
909         if (dev != DEVICE_B || (!calResponse && msg->GetMessageType() != TYPE_REQUEST)) {
910             return;
911         }
912         messageCount++;
913     });
914 }
915 
ReOpenDB()916 void ReOpenDB()
917 {
918     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
919     g_kvDelegatePtr = nullptr;
920     KvStoreNbDelegate::Option option;
921     option.secOption.securityLabel = SecurityLabel::S3;
922     option.secOption.securityFlag = SecurityFlag::SECE;
923     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
924     ASSERT_TRUE(g_kvDelegateStatus == OK);
925     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
926 }
927 }
928 
929 /**
930  * @tc.name: AckSessionCheck 001
931  * @tc.desc: Test ack session check function.
932  * @tc.type: FUNC
933  * @tc.require: AR000F3OOV
934  * @tc.author: zhangqiquan
935  */
936 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSessionCheck001, TestSize.Level3)
937 {
938     std::vector<std::string> devices;
939     devices.push_back(g_deviceB->GetDeviceId());
940 
941     /**
942      * @tc.steps: step1. deviceB sync to deviceA just for timeSync and abilitySync
943      * @tc.expected: step1. should return OK.
944      */
945     ASSERT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
946 
947     /**
948      * @tc.steps: step2. deviceA StartTransaction for prevent other sync action deviceB sync will fail
949      * @tc.expected: step2. should return OK.
950      */
951     ASSERT_TRUE(g_kvDelegatePtr->StartTransaction() == OK);
952 
953     bool errCodeAck = false;
954     bool afterErrAck = false;
955     RegOnDispatchWithDelayAck(errCodeAck, afterErrAck);
956 
957     Key key = {'1'};
958     Value value = {'1'};
959     Timestamp currentTime;
960     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
961     EXPECT_TRUE(g_deviceB->PutData(key, value, currentTime, 0) == E_OK);
962     EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
963 
964     Value outValue;
965     EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == NOT_FOUND);
966 
967     /**
968      * @tc.steps: step3. release the writeHandle and try again, sync success
969      * @tc.expected: step3. should return OK.
970      */
971     EXPECT_TRUE(g_kvDelegatePtr->Commit() == OK);
972     EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
973 
974     EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == E_OK);
975     EXPECT_EQ(outValue, value);
976 }
977 
978 /**
979  * @tc.name: AckSafeCheck001
980  * @tc.desc: Test ack session check filter all bad ack in device offline scene.
981  * @tc.type: FUNC
982  * @tc.require: AR000F3OOV
983  * @tc.author: zhangqiquan
984  */
985 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSafeCheck001, TestSize.Level3)
986 {
987     std::vector<std::string> devices;
988     devices.push_back(g_deviceB->GetDeviceId());
989     g_deviceB->Online();
990 
991     Key key = {'1'};
992     Query query = Query::Select().PrefixKey(key);
993     PrepareEnv(devices, key, query);
994 
995     std::condition_variable conditionOnline;
996     std::condition_variable conditionOffline;
997     bool onlineFlag = false;
998     bool invalid = false;
999     bool offlineFlag = false;
__anon728505271602() 1000     thread subThread([&onlineFlag, &conditionOnline, &offlineFlag, &conditionOffline]() {
1001         LOGW("[Dispatch] NOW DEVICES IS OFFLINE");
1002         std::mutex offlineMtx;
1003         std::unique_lock<std::mutex> lck(offlineMtx);
1004         conditionOffline.wait(lck, [&offlineFlag]{ return offlineFlag; });
1005         g_deviceB->Offline();
1006         std::this_thread::sleep_for(std::chrono::seconds(1));
1007         g_deviceB->Online();
1008         onlineFlag = true;
1009         conditionOnline.notify_all();
1010         LOGW("[Dispatch] NOW DEVICES IS ONLINE");
1011     });
1012     subThread.detach();
1013 
1014     RegOnDispatchWithOffline(offlineFlag, invalid, conditionOffline);
1015 
1016     SyncWithDeviceOffline(devices, key, query);
1017 
1018     std::mutex onlineMtx;
1019     std::unique_lock<std::mutex> lck(onlineMtx);
__anon728505271802null1020     conditionOnline.wait(lck, [&onlineFlag]{ return onlineFlag; });
1021 
1022     /**
1023      * @tc.steps: step4. sync again if has problem it will sync never end
1024      * @tc.expected: step4. should return OK.
1025      */
1026     SyncWithQuery(devices, query, OK);
1027 }
1028 
1029 /**
1030  * @tc.name: WaterMarkCheck001
1031  * @tc.desc: Test waterMark work correct in lost package scene.
1032  * @tc.type: FUNC
1033  * @tc.require: AR000F3OOV
1034  * @tc.author: zhangqiquan
1035  */
1036 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck001, TestSize.Level1)
1037 {
1038     std::vector<std::string> devices;
1039     Query query = Query::Select();
1040     PrepareWaterMarkError(devices, query);
1041 
1042     /**
1043      * @tc.steps: step4. sync again see it work correct
1044      * @tc.expected: step4. should return OK.
1045      */
1046     SyncWithQuery(devices, query, OK);
1047 }
1048 
1049 /**
1050  * @tc.name: WaterMarkCheck002
1051  * @tc.desc: Test pull work correct in error waterMark scene.
1052  * @tc.type: FUNC
1053  * @tc.require: AR000F3OOV
1054  * @tc.author: zhangqiquan
1055  */
1056 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck002, TestSize.Level1)
1057 {
1058     std::vector<std::string> devices;
1059     Query query = Query::Select();
1060     PrepareWaterMarkError(devices, query);
1061 
1062     /**
1063      * @tc.steps: step4. sync again see it work correct
1064      * @tc.expected: step4. should return OK.
1065      */
1066     Key key = {'2'};
1067     ASSERT_TRUE(g_kvDelegatePtr->Put(key, {}) == OK);
1068     query = Query::Select();
1069     SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PULL_ONLY, OK);
1070 
1071     VirtualDataItem item;
1072     EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
1073 }
1074 
RegOnDispatchToGetSyncCount(int & sendRequestCount,int sleepMs=0)1075 void RegOnDispatchToGetSyncCount(int &sendRequestCount, int sleepMs = 0)
1076 {
1077     g_communicatorAggregator->RegOnDispatch([sleepMs, &sendRequestCount](
1078             const std::string &dev, Message *inMsg) {
1079         if (dev == DEVICE_B && inMsg->GetMessageType() == TYPE_REQUEST) {
1080             std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1081             sendRequestCount++;
1082             LOGD("sendRequestCount++...");
1083         }
1084     });
1085 }
1086 
TestDifferentSyncMode(SyncMode mode)1087 void TestDifferentSyncMode(SyncMode mode)
1088 {
1089     std::vector<std::string> devices;
1090     devices.push_back(g_deviceB->GetDeviceId());
1091 
1092     /**
1093      * @tc.steps: step1. deviceA put {k1, v1}
1094      */
1095     Key key = {'1'};
1096     Value value = {'1'};
1097     DBStatus status = g_kvDelegatePtr->Put(key, value);
1098     ASSERT_TRUE(status == OK);
1099 
1100     int sendRequestCount = 0;
1101     RegOnDispatchToGetSyncCount(sendRequestCount);
1102 
1103     /**
1104      * @tc.steps: step2. deviceA call sync and wait
1105      * @tc.expected: step2. sync should return OK.
1106      */
1107     std::map<std::string, DBStatus> result;
1108     status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result);
1109     ASSERT_TRUE(status == OK);
1110 
1111     /**
1112      * @tc.expected: step2. onComplete should be called, DeviceB have {k1,v1}, send request message 3 times
1113      */
1114     ASSERT_TRUE(result.size() == devices.size());
1115     for (const auto &pair : result) {
1116         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1117         EXPECT_TRUE(pair.second == OK);
1118     }
1119     VirtualDataItem item;
1120     g_deviceB->GetData(key, item);
1121     EXPECT_TRUE(item.value == value);
1122 
1123     EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1124 
1125     /**
1126      * @tc.steps: step3. reset sendRequestCount to 0, deviceA call sync and wait again without any change in db
1127      * @tc.expected: step3. sync should return OK, and sendRequestCount should be 1, because this merge can not
1128      * be skipped
1129      */
1130     sendRequestCount = 0;
1131     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1132     ASSERT_TRUE(status == OK);
1133     EXPECT_EQ(sendRequestCount, 1);
1134 }
1135 
1136 /**
1137  * @tc.name: PushSyncMergeCheck001
1138  * @tc.desc: Test push sync task merge, task can not be merged when the two sync task is not in the queue
1139  * at the same time.
1140  * @tc.type: FUNC
1141  * @tc.require: AR000F3OOV
1142  * @tc.author: zhangshijie
1143  */
1144 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck001, TestSize.Level1)
1145 {
1146     TestDifferentSyncMode(SYNC_MODE_PUSH_ONLY);
1147 }
1148 
1149 /**
1150  * @tc.name: PushSyncMergeCheck002
1151  * @tc.desc: Test push_pull sync task merge, task can not be merged when the two sync task is not in the queue
1152  * at the same time.
1153  * @tc.type: FUNC
1154  * @tc.require: AR000F3OOV
1155  * @tc.author: zhangshijie
1156  */
1157 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck002, TestSize.Level1)
1158 {
1159     TestDifferentSyncMode(SYNC_MODE_PUSH_PULL);
1160 }
1161 
PrepareForSyncMergeTest(std::vector<std::string> & devices,int & sendRequestCount)1162 void PrepareForSyncMergeTest(std::vector<std::string> &devices, int &sendRequestCount)
1163 {
1164     /**
1165      * @tc.steps: step1. deviceA put {k1, v1}
1166      */
1167     Key key = {'1'};
1168     Value value = {'1'};
1169     DBStatus status = g_kvDelegatePtr->Put(key, value);
1170     ASSERT_TRUE(status == OK);
1171 
1172     RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1173 
1174     /**
1175      * @tc.steps: step2. deviceA call sync and don't wait
1176      * @tc.expected: step2. sync should return OK.
1177      */
1178     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
1179         [&sendRequestCount, devices, key, value](const std::map<std::string, DBStatus>& statusMap) {
1180         ASSERT_TRUE(statusMap.size() == devices.size());
1181         for (const auto &pair : statusMap) {
1182             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1183             EXPECT_TRUE(pair.second == OK);
1184         }
1185         VirtualDataItem item;
1186         g_deviceB->GetData(key, item);
1187         EXPECT_EQ(item.value, value);
1188         EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1189 
1190         // reset sendRequestCount to 0
1191         sendRequestCount = 0;
1192     });
1193     ASSERT_TRUE(status == OK);
1194 }
1195 
1196 /**
1197  * @tc.name: PushSyncMergeCheck003
1198  * @tc.desc: Test push sync task merge, task can not be merged when there is change in db since last push sync
1199  * @tc.type: FUNC
1200  * @tc.require: AR000F3OOV
1201  * @tc.author: zhangshijie
1202  */
1203 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck003, TestSize.Level3)
1204 {
1205     DBStatus status = OK;
1206     std::vector<std::string> devices;
1207     devices.push_back(g_deviceB->GetDeviceId());
1208 
1209     int sendRequestCount = 0;
1210     PrepareForSyncMergeTest(devices, sendRequestCount);
1211 
1212     /**
1213      * @tc.steps: step3. deviceA call sync and don't wait
1214      * @tc.expected: step3. sync should return OK.
1215      */
1216     Key key = {'1'};
1217     Value value = {'2'};
1218     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505271b02(const std::map<std::string, DBStatus>& statusMap) 1219         [&sendRequestCount, devices, key, value, this](const std::map<std::string, DBStatus>& statusMap) {
1220         /**
1221          * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1222          * skipped, but it is no need to do time sync and ability sync, only need to do data sync
1223          */
1224         ASSERT_TRUE(statusMap.size() == devices.size());
1225         for (const auto &pair : statusMap) {
1226             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1227             EXPECT_TRUE(pair.second == OK);
1228         }
1229         VirtualDataItem item;
1230         g_deviceB->GetData(key, item);
1231         EXPECT_EQ(item.value, value);
1232     });
1233     ASSERT_TRUE(status == OK);
1234 
1235     /**
1236      * @tc.steps: step4. deviceA put {k1, v2}
1237      */
1238     while (sendRequestCount < TWO_CNT) {
1239         std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1240     }
1241     status = g_kvDelegatePtr->Put(key, value);
1242     ASSERT_TRUE(status == OK);
1243     // wait for the second sync task finish
1244     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1245     EXPECT_EQ(sendRequestCount, 1);
1246 }
1247 
1248 /**
1249  * @tc.name: PushSyncMergeCheck004
1250  * @tc.desc: Test push sync task merge, task can be merged when there is no change in db since last push sync
1251  * @tc.type: FUNC
1252  * @tc.require: AR000F3OOV
1253  * @tc.author: zhangshijie
1254  */
1255 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck004, TestSize.Level3)
1256 {
1257     DBStatus status = OK;
1258     std::vector<std::string> devices;
1259     devices.push_back(g_deviceB->GetDeviceId());
1260 
1261     int sendRequestCount = 0;
1262     PrepareForSyncMergeTest(devices, sendRequestCount);
1263 
1264     /**
1265      * @tc.steps: step3. deviceA call sync and don't wait
1266      * @tc.expected: step3. sync should return OK.
1267      */
1268     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505271c02(const std::map<std::string, DBStatus>& statusMap) 1269         [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1270         /**
1271          * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can  be
1272          * skipped
1273          */
1274         ASSERT_TRUE(statusMap.size() == devices.size());
1275         for (const auto &pair : statusMap) {
1276             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1277             EXPECT_TRUE(pair.second == OK);
1278         }
1279     });
1280     ASSERT_TRUE(status == OK);
1281     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1282     EXPECT_EQ(sendRequestCount, 0);
1283 }
1284 
RegOnDispatchWithInvalidMsgAndCnt(int & sendRequestCount,int sleepMs,bool & invalid)1285 void RegOnDispatchWithInvalidMsgAndCnt(int &sendRequestCount, int sleepMs, bool &invalid)
1286 {
1287     g_communicatorAggregator->RegOnDispatch([&sendRequestCount, sleepMs, &invalid](
1288         const std::string &dev, Message *inMsg) {
1289         if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
1290             inMsg->SetMessageType(TYPE_INVALID);
1291             inMsg->SetMessageId(INVALID_MESSAGE_ID);
1292             sendRequestCount++;
1293             invalid = true;
1294             LOGW("[Dispatch]invalid THIS MSG, sendRequestCount = %d", sendRequestCount);
1295             std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1296         }
1297     });
1298 }
1299 
1300 /**
1301  * @tc.name: PushSyncMergeCheck005
1302  * @tc.desc: Test push sync task merge, task cannot be merged when the last push sync is failed
1303  * @tc.type: FUNC
1304  * @tc.require: AR000F3OOV
1305  * @tc.author: zhangshijie
1306  */
1307 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck005, TestSize.Level3)
1308 {
1309     DBStatus status = OK;
1310     std::vector<std::string> devices;
1311     devices.push_back(g_deviceB->GetDeviceId());
1312 
1313     /**
1314      * @tc.steps: step1. deviceA put {k1, v1}
1315      */
1316     Key key = {'1'};
1317     Value value = {'1'};
1318     status = g_kvDelegatePtr->Put(key, value);
1319     ASSERT_TRUE(status == OK);
1320 
1321     int sendRequestCount = 0;
1322     bool invalid = false;
1323     RegOnDispatchWithInvalidMsgAndCnt(sendRequestCount, SLEEP_MILLISECONDS, invalid);
1324 
1325     /**
1326      * @tc.steps: step2. deviceA call sync and don't wait
1327      * @tc.expected: step2. sync should return TIME_OUT.
1328      */
1329     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505271e02(const std::map<std::string, DBStatus>& statusMap) 1330         [&sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1331         ASSERT_TRUE(statusMap.size() == devices.size());
1332         for (const auto &deviceId : devices) {
1333             ASSERT_EQ(statusMap.at(deviceId), TIME_OUT);
1334         }
1335     });
1336     EXPECT_TRUE(status == OK);
1337 
1338     /**
1339      * @tc.steps: step3. deviceA call sync and don't wait
1340      * @tc.expected: step3. sync should return OK.
1341      */
1342     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505271f02(const std::map<std::string, DBStatus>& statusMap) 1343         [key, value, &sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1344         /**
1345          * @tc.expected: when the second sync task return, sendRequestCount should be 3, because this merge can not be
1346          * skipped, deviceB should have {k1, v1}.
1347          */
1348         ASSERT_TRUE(statusMap.size() == devices.size());
1349         for (const auto &pair : statusMap) {
1350             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1351             EXPECT_EQ(pair.second, OK);
1352         }
1353         VirtualDataItem item;
1354         g_deviceB->GetData(key, item);
1355         EXPECT_EQ(item.value, value);
1356     });
1357     ASSERT_TRUE(status == OK);
1358     while (sendRequestCount < 1) {
1359         std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1360     }
1361     sendRequestCount = 0;
1362     RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1363 
1364     // wait for the second sync task finish
1365     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1366     EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1367 }
1368 
PrePareForQuerySyncMergeTest(bool isQuerySync,std::vector<std::string> & devices,Key & key,Value & value,int & sendRequestCount)1369 void PrePareForQuerySyncMergeTest(bool isQuerySync, std::vector<std::string> &devices,
1370     Key &key, Value &value, int &sendRequestCount)
1371 {
1372     DBStatus status = OK;
1373     /**
1374      * @tc.steps: step1. deviceA put {k1, v1}...{k10, v10}
1375      */
1376     Query query = Query::Select().PrefixKey(key);
1377     const int dataSize = 10;
1378     for (int i = 0; i < dataSize; i++) {
1379         key.push_back(i);
1380         value.push_back(i);
1381         status = g_kvDelegatePtr->Put(key, value);
1382         ASSERT_TRUE(status == OK);
1383         key.pop_back();
1384         value.pop_back();
1385     }
1386 
1387     RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1388     /**
1389      * @tc.steps: step2. deviceA call query sync and don't wait
1390      * @tc.expected: step2. sync should return OK.
1391      */
1392     auto completeCallBack = [&sendRequestCount, &key, &value, dataSize, devices]
1393         (const std::map<std::string, DBStatus>& statusMap) {
1394         ASSERT_TRUE(statusMap.size() == devices.size());
1395         for (const auto &pair : statusMap) {
1396             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1397             EXPECT_EQ(pair.second, OK);
1398         }
1399         // when first sync finish, DeviceB have {k1,v1}, {k3,v3}, {k5,v5} .. send request message 3 times
1400         VirtualDataItem item;
1401         for (int i = 0; i < dataSize; i++) {
1402             key.push_back(i);
1403             value.push_back(i);
1404             g_deviceB->GetData(key, item);
1405             EXPECT_EQ(item.value, value);
1406             key.pop_back();
1407             value.pop_back();
1408         }
1409         EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1410         // reset sendRequestCount to 0
1411         sendRequestCount = 0;
1412     };
1413     if (isQuerySync) {
1414         status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack, query, false);
1415     } else {
1416         status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack);
1417     }
1418     ASSERT_TRUE(status == OK);
1419 }
1420 
1421 /**
1422  * @tc.name: QuerySyncMergeCheck001
1423  * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last query sync
1424  * @tc.type: FUNC
1425  * @tc.require: AR000F3OOV
1426  * @tc.author: zhangshijie
1427  */
1428 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck001, TestSize.Level3)
1429 {
1430     std::vector<std::string> devices;
1431     int sendRequestCount = 0;
1432     devices.push_back(g_deviceB->GetDeviceId());
1433 
1434     Key key {'1'};
1435     Value value {'1'};
1436     Query query = Query::Select().PrefixKey(key);
1437     PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1438 
1439     /**
1440      * @tc.steps: step3. deviceA call query sync and don't wait
1441      * @tc.expected: step3. sync should return OK.
1442      */
1443     DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505272102(const std::map<std::string, DBStatus>& statusMap) 1444         [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1445         /**
1446          * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can be
1447          * skipped because there is no change in db since last query sync
1448          */
1449         ASSERT_TRUE(statusMap.size() == devices.size());
1450         for (const auto &pair : statusMap) {
1451             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1452             EXPECT_TRUE(pair.second == OK);
1453         }
1454     }, query, false);
1455     ASSERT_TRUE(status == OK);
1456     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1457     EXPECT_EQ(sendRequestCount, 0);
1458 }
1459 
1460 /**
1461  * @tc.name: QuerySyncMergeCheck002
1462  * @tc.desc: Test query push sync task merge, task can not be merged when there is change in db since last sync
1463  * @tc.type: FUNC
1464  * @tc.require: AR000F3OOV
1465  * @tc.author: zhangshijie
1466  */
1467 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck002, TestSize.Level3)
1468 {
1469     std::vector<std::string> devices;
1470     int sendRequestCount = 0;
1471     devices.push_back(g_deviceB->GetDeviceId());
1472 
1473     Key key {'1'};
1474     Value value {'1'};
1475     Query query = Query::Select().PrefixKey(key);
1476     PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1477 
1478     /**
1479      * @tc.steps: step3. deviceA call query sync and don't wait
1480      * @tc.expected: step3. sync should return OK.
1481      */
1482     Value value3{'3'};
1483     DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505272202(const std::map<std::string, DBStatus>& statusMap) 1484         [&sendRequestCount, devices, key, value3, this](const std::map<std::string, DBStatus>& statusMap) {
1485         /**
1486          * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1487          * skipped when there is change in db since last query sync, deviceB have {k1, v1'}
1488          */
1489         ASSERT_TRUE(statusMap.size() == devices.size());
1490         for (const auto &pair : statusMap) {
1491             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1492             EXPECT_TRUE(pair.second == OK);
1493         }
1494         VirtualDataItem item;
1495         g_deviceB->GetData(key, item);
1496         EXPECT_TRUE(item.value == value3);
1497         EXPECT_EQ(sendRequestCount, 1);
1498         }, query, false);
1499     ASSERT_TRUE(status == OK);
1500 
1501     /**
1502      * @tc.steps: step4. deviceA put {k1, v1'}
1503      * @tc.steps: step4. reset sendRequestCount to 0, deviceA call sync and wait
1504      * @tc.expected: step4. sync should return OK, and sendRequestCount should be 1, because this merge can not
1505      * be skipped
1506      */
1507     while (sendRequestCount < TWO_CNT) {
1508         std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1509     }
1510     g_kvDelegatePtr->Put(key, value3);
1511     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1512 }
1513 
1514 /**
1515  * @tc.name: QuerySyncMergeCheck003
1516  * @tc.desc: Test query push sync task merge, task can not be merged when then query id is different
1517  * @tc.type: FUNC
1518  * @tc.require: AR000F3OOV
1519  * @tc.author: zhangshijie
1520  */
1521 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck003, TestSize.Level3)
1522 {
1523     std::vector<std::string> devices;
1524     int sendRequestCount = 0;
1525     devices.push_back(g_deviceB->GetDeviceId());
1526 
1527     Key key {'1'};
1528     Value value {'1'};
1529     PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1530 
1531     /**
1532      * @tc.steps: step3.  deviceA call another query sync
1533      * @tc.expected: step3. sync should return OK.
1534      */
1535     Key key2 = {'2'};
1536     Value value2 = {'2'};
1537     DBStatus status = g_kvDelegatePtr->Put(key2, value2);
1538     ASSERT_TRUE(status == OK);
1539     Query query2 = Query::Select().PrefixKey(key2);
1540     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505272302(const std::map<std::string, DBStatus>& statusMap) 1541         [&sendRequestCount, key2, value2, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1542         /**
1543          * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1544          * skipped, deviceB have {k2,v2}
1545          */
1546         ASSERT_TRUE(statusMap.size() == devices.size());
1547         for (const auto &pair : statusMap) {
1548             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1549             EXPECT_TRUE(pair.second == OK);
1550         }
1551         VirtualDataItem item;
1552         g_deviceB->GetData(key2, item);
1553         EXPECT_TRUE(item.value == value2);
1554         EXPECT_EQ(sendRequestCount, 1);
1555         }, query2, false);
1556     ASSERT_TRUE(status == OK);
1557     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1558 }
1559 
1560 /**
1561 * @tc.name: QuerySyncMergeCheck004
1562 * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last push sync
1563 * @tc.type: FUNC
1564 * @tc.require: AR000F3OOV
1565 * @tc.author: zhangshijie
1566 */
1567 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck004, TestSize.Level3)
1568 {
1569     DBStatus status = OK;
1570     std::vector<std::string> devices;
1571     devices.push_back(g_deviceB->GetDeviceId());
1572 
1573     Key key {'1'};
1574     Value value {'1'};
1575     int sendRequestCount = 0;
1576     PrePareForQuerySyncMergeTest(false, devices, key, value, sendRequestCount);
1577 
1578     /**
1579      * @tc.steps: step3. deviceA call query sync without any change in db
1580      * @tc.expected: step3. sync should return OK, and sendRequestCount should be 0, because this merge can be skipped
1581      */
1582     Query query = Query::Select().PrefixKey(key);
1583     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505272402(const std::map<std::string, DBStatus>& statusMap) 1584         [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1585             /**
1586              * @tc.expected step3: when the second sync task return, sendRequestCount should be 0, because this merge
1587              * can be skipped because there is no change in db since last push sync
1588              */
1589             ASSERT_TRUE(statusMap.size() == devices.size());
1590             for (const auto &pair : statusMap) {
1591                 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1592                 EXPECT_TRUE(pair.second == OK);
1593             }
1594         }, query, false);
1595     ASSERT_TRUE(status == OK);
1596     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1597     EXPECT_EQ(sendRequestCount, 0);
1598 }
1599 
1600 /**
1601   * @tc.name: GetDataNotify001
1602   * @tc.desc: Test GetDataNotify function, delay < 30s should sync ok, > 36 should timeout
1603   * @tc.type: FUNC
1604   * @tc.require: AR000D4876
1605   * @tc.author: zhangqiquan
1606   */
1607 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify001, TestSize.Level3)
1608 {
1609     ASSERT_NE(g_kvDelegatePtr, nullptr);
1610     DBStatus status = OK;
1611     std::vector<std::string> devices;
1612     devices.push_back(g_deviceB->GetDeviceId());
1613     const std::string DEVICE_A = "real_device";
1614     /**
1615      * @tc.steps: step1. deviceB set get data delay 40s
1616      */
1617     g_deviceB->DelayGetSyncData(WAIT_40_SECONDS);
1618     g_communicatorAggregator->SetTimeout(DEVICE_A, TIMEOUT_6_SECONDS);
1619 
1620     /**
1621      * @tc.steps: step2. deviceA call sync and wait
1622      * @tc.expected: step2. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
1623      */
1624     std::map<std::string, DBStatus> result;
1625     std::map<std::string, int> virtualRes;
1626     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1627     EXPECT_EQ(status, OK);
1628     EXPECT_EQ(result.size(), devices.size());
1629     EXPECT_EQ(result[DEVICE_B], TIME_OUT);
1630     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1631     Query query = Query::Select();
__anon728505272502(std::map<std::string, int> resMap) 1632     g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1633         virtualRes = std::move(resMap);
1634     }, true);
1635     EXPECT_EQ(virtualRes.size(), devices.size());
1636     EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_TIMEOUT));
1637     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1638 
1639     /**
1640      * @tc.steps: step3. deviceB set get data delay 30s
1641      */
1642     g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1643     /**
1644      * @tc.steps: step4. deviceA call sync and wait
1645      * @tc.expected: step4. sync should return OK. onComplete should be called, deviceB sync OK.
1646      */
1647     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1648     EXPECT_EQ(status, OK);
1649     EXPECT_EQ(result.size(), devices.size());
1650     EXPECT_EQ(result[DEVICE_B], OK);
1651     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
__anon728505272602(std::map<std::string, int> resMap) 1652     g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1653         virtualRes = std::move(resMap);
1654     }, true);
1655     EXPECT_EQ(virtualRes.size(), devices.size());
1656     EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_FINISHED_ALL));
1657     g_deviceB->DelayGetSyncData(0);
1658 }
1659 
1660 /**
1661   * @tc.name: GetDataNotify002
1662   * @tc.desc: Test GetDataNotify function, two device sync each other at same time
1663   * @tc.type: FUNC
1664   * @tc.require: AR000D4876
1665   * @tc.author: zhangqiquan
1666   */
1667 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify002, TestSize.Level3)
1668 {
1669     ASSERT_NE(g_kvDelegatePtr, nullptr);
1670     DBStatus status = OK;
1671     std::vector<std::string> devices;
1672     devices.push_back(g_deviceB->GetDeviceId());
1673     const std::string DEVICE_A = "real_device";
1674 
1675     /**
1676      * @tc.steps: step1. deviceA sync first to finish time sync and ability sync
1677      */
1678     std::map<std::string, DBStatus> result;
1679     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1680     EXPECT_EQ(status, OK);
1681     EXPECT_EQ(result.size(), devices.size());
1682     EXPECT_EQ(result[DEVICE_B], OK);
1683     /**
1684      * @tc.steps: step2. deviceB set get data delay 30s
1685      */
1686     g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1687 
1688     /**
1689      * @tc.steps: step3. deviceB call sync and wait
1690      */
__anon728505272702() 1691     std::thread asyncThread([]() {
1692         std::map<std::string, int> virtualRes;
1693         Query query = Query::Select();
1694         g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1695                 virtualRes = std::move(resMap);
1696             }, true);
1697     });
1698 
1699     /**
1700      * @tc.steps: step4. deviceA call sync and wait
1701      * @tc.expected: step4. sync should return OK. because notify timer trigger (30s - 1s)/2s => 15times
1702      */
1703     std::this_thread::sleep_for(std::chrono::seconds(1));
1704     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1705     EXPECT_EQ(status, OK);
1706     EXPECT_EQ(result.size(), devices.size());
1707     EXPECT_EQ(result[DEVICE_B], OK);
1708     asyncThread.join();
1709     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1710 }
1711 
1712 /**
1713  * @tc.name: DelaySync001
1714  * @tc.desc: Test delay first packet will not effect data conflict
1715  * @tc.type: FUNC
1716  * @tc.require:
1717  * @tc.author: zqq
1718  */
1719 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, DelaySync001, TestSize.Level3)
1720 {
1721     // B put (k, b) after A put (k, a)
1722     Key key = {'k'};
1723     Value aValue = {'a'};
1724     g_kvDelegatePtr->Put(key, aValue);
1725     std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for data conflict
1726     Timestamp currentTime = TimeHelper::GetSysCurrentTime() + TimeHelper::BASE_OFFSET;
1727     Value bValue = {'b'};
1728     EXPECT_EQ(g_deviceB->PutData(key, bValue, currentTime, 0), E_OK);
1729 
1730     // delay time sync message, delay time/2 should greater than put sleep time
1731     g_communicatorAggregator->SetTimeout(DEVICE_B, DBConstant::MAX_TIMEOUT);
1732     g_communicatorAggregator->SetTimeout("real_device", DBConstant::MAX_TIMEOUT);
__anon728505272902(const std::string &dstTarget, const Message *msg) 1733     g_communicatorAggregator->RegBeforeDispatch([](const std::string &dstTarget, const Message *msg) {
1734         if (dstTarget == DEVICE_B && msg->GetMessageId() == MessageId::TIME_SYNC_MESSAGE) {
1735             std::this_thread::sleep_for(std::chrono::seconds(3)); // sleep for 3s
1736         }
1737     });
1738 
1739     // A call sync and (k, b) in A
1740     std::vector<std::string> devices;
1741     devices.push_back(g_deviceB->GetDeviceId());
1742     std::map<std::string, DBStatus> result;
1743     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1744     EXPECT_EQ(status, OK);
1745     EXPECT_EQ(result.size(), devices.size());
1746     EXPECT_EQ(result[DEVICE_B], OK);
1747 
1748     Value actualValue;
1749     g_kvDelegatePtr->Get(key, actualValue);
1750     EXPECT_EQ(actualValue, bValue);
1751     g_communicatorAggregator->RegBeforeDispatch(nullptr);
1752 }
1753 
1754 /**
1755  * @tc.name: KVAbilitySyncOpt001
1756  * @tc.desc: check ability sync 2 packet
1757  * @tc.type: FUNC
1758  * @tc.require:
1759  * @tc.author: zhangqiquan
1760  */
1761 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVAbilitySyncOpt001, TestSize.Level0)
1762 {
1763     /**
1764      * @tc.steps: step1. record packet
1765      * @tc.expected: step1. sync should failed in source.
1766      */
1767     std::atomic<int> messageCount = 0;
__anon728505272a02(const std::string &dev, Message *msg) 1768     g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &dev, Message *msg) {
1769         if (msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
1770             return;
1771         }
1772         messageCount++;
1773         EXPECT_GE(g_kvDelegatePtr->GetTaskCount(), 1);
1774     });
1775     /**
1776      * @tc.steps: step2. deviceA call sync and wait
1777      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
1778      */
1779     DBStatus status = OK;
1780     std::vector<std::string> devices;
1781     devices.push_back(g_deviceB->GetDeviceId());
1782     std::map<std::string, DBStatus> result;
1783     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1784     EXPECT_EQ(status, OK);
1785     EXPECT_EQ(messageCount, 2); // 2 ability sync
1786     for (const auto &pair : result) {
1787         EXPECT_EQ(pair.second, OK);
1788     }
1789 }
1790 
1791 /**
1792  * @tc.name: KVAbilitySyncOpt002
1793  * @tc.desc: check get task count while conn is nullptr.
1794  * @tc.type: FUNC
1795  * @tc.require:
1796  * @tc.author: caihaoting
1797  */
1798 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVAbilitySyncOpt002, TestSize.Level0)
1799 {
1800     /**
1801      * @tc.steps: step1. record packet while conn is nullptr.
1802      * @tc.expected: step1. sync should failed in source and get task count return DB_ERROR.
1803      */
1804     auto kvStoreImpl = static_cast<KvStoreNbDelegateImpl *>(g_kvDelegatePtr);
1805     EXPECT_EQ(kvStoreImpl->Close(), OK);
1806     std::atomic<int> messageCount = 0;
__anon728505272b02(const std::string &dev, Message *msg) 1807     g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &dev, Message *msg) {
1808         if (msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
1809             return;
1810         }
1811         messageCount++;
1812         EXPECT_EQ(g_kvDelegatePtr->GetTaskCount(), DB_ERROR);
1813     });
1814 }
1815 
1816 /**
1817  * @tc.name: KVSyncOpt001
1818  * @tc.desc: check time sync and ability sync once
1819  * @tc.type: FUNC
1820  * @tc.require:
1821  * @tc.author: zhangqiquan
1822  */
1823 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt001, TestSize.Level0)
1824 {
1825     /**
1826      * @tc.steps: step1. record packet which send to B
1827      */
1828     std::atomic<int> messageCount = 0;
1829     RegOnDispatchWithoutDataPacket(messageCount);
1830     /**
1831      * @tc.steps: step2. deviceA call sync and wait
1832      * @tc.expected: step2. sync should return OK.
1833      */
1834     std::vector<std::string> devices;
1835     devices.push_back(g_deviceB->GetDeviceId());
1836     Sync(devices, OK);
1837     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
1838     /**
1839      * @tc.steps: step3. reopen kv store
1840      * @tc.expected: step3. reopen OK.
1841      */
1842     ReOpenDB();
1843     /**
1844      * @tc.steps: step4. reopen kv store and sync again
1845      * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
1846      */
1847     messageCount = 0;
1848     Sync(devices, OK);
1849     EXPECT_EQ(messageCount, 0);
1850     g_communicatorAggregator->RegOnDispatch(nullptr);
1851 }
1852 
1853 /**
1854  * @tc.name: KVSyncOpt002
1855  * @tc.desc: check device time sync once
1856  * @tc.type: FUNC
1857  * @tc.require:
1858  * @tc.author: zhangqiquan
1859  */
1860 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt002, TestSize.Level0)
1861 {
1862 /**
1863      * @tc.steps: step1. record packet which send to B
1864      */
1865     std::atomic<int> messageCount = 0;
1866     RegOnDispatchWithoutDataPacket(messageCount);
1867     /**
1868      * @tc.steps: step2. deviceA call sync and wait
1869      * @tc.expected: step2. sync should return OK.
1870      */
1871     std::vector<std::string> devices;
1872     devices.push_back(g_deviceB->GetDeviceId());
1873     Sync(devices, OK);
1874     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
1875     // close kv store avoid packet dispatch error
1876     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1877     g_kvDelegatePtr = nullptr;
1878     ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID), OK);
1879     EXPECT_TRUE(RuntimeContext::GetInstance()->IsTimeTickMonitorValid());
1880     /**
1881      * @tc.steps: step3. open new kv store
1882      * @tc.expected: step3. open OK.
1883      */
1884     KvStoreNbDelegate::Option option;
1885     option.secOption.securityLabel = SecurityLabel::S3;
1886     option.secOption.securityFlag = SecurityFlag::SECE;
1887     KvStoreNbDelegate *delegate2 = nullptr;
__anon728505272c02(DBStatus status, KvStoreNbDelegate *delegate) 1888     g_mgr.GetKvStore(STORE_ID_2, option, [&delegate2](DBStatus status, KvStoreNbDelegate *delegate) {
1889         delegate2 = delegate;
1890         EXPECT_EQ(status, OK);
1891     });
1892     /**
1893      * @tc.steps: step4. sync again
1894      * @tc.expected: step4. sync success, only ability sync packet.
1895      */
1896     messageCount = 0;
1897     Sync(delegate2, devices, OK);
1898     EXPECT_EQ(messageCount, 1); // 1 contain ability sync packet
1899     EXPECT_EQ(g_mgr.CloseKvStore(delegate2), OK);
1900     EXPECT_EQ(g_mgr.DeleteKvStore(STORE_ID_2), OK);
1901     g_communicatorAggregator->RegOnDispatch(nullptr);
1902 }
1903 
1904 /**
1905  * @tc.name: KVSyncOpt003
1906  * @tc.desc: check time sync and ability sync once
1907  * @tc.type: FUNC
1908  * @tc.require:
1909  * @tc.author: zhangqiquan
1910  */
1911 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt003, TestSize.Level0)
1912 {
1913     /**
1914      * @tc.steps: step1. record packet which send to B
1915      */
1916     std::atomic<int> messageCount = 0;
1917     RegOnDispatchWithoutDataPacket(messageCount);
1918     /**
1919      * @tc.steps: step2. deviceA call sync and wait
1920      * @tc.expected: step2. sync should return OK.
1921      */
1922     std::vector<std::string> devices;
1923     devices.push_back(g_deviceB->GetDeviceId());
1924     Sync(devices, OK);
1925     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
1926     /**
1927      * @tc.steps: step3. reopen kv store
1928      * @tc.expected: step3. reopen OK.
1929      */
1930     ReOpenDB();
1931     /**
1932      * @tc.steps: step4. reopen kv store and sync again
1933      * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
1934      */
1935     messageCount = 0;
1936     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
1937     EXPECT_EQ(messageCount, 0);
1938     g_communicatorAggregator->RegOnDispatch(nullptr);
1939 }
1940 
1941 /**
1942  * @tc.name: KVSyncOpt004
1943  * @tc.desc: check sync in keys after reopen db
1944  * @tc.type: FUNC
1945  * @tc.require:
1946  * @tc.author: zhangqiquan
1947  */
1948 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt004, TestSize.Level0)
1949 {
1950     /**
1951      * @tc.steps: step1. deviceA call sync and wait
1952      * @tc.expected: step1. sync should return OK.
1953      */
1954     std::vector<std::string> devices;
1955     devices.push_back(g_deviceB->GetDeviceId());
1956     Sync(devices, OK);
1957     /**
1958      * @tc.steps: step2. reopen kv store
1959      * @tc.expected: step2. reopen OK.
1960      */
1961     ReOpenDB();
1962     /**
1963      * @tc.steps: step3. sync with in keys
1964      * @tc.expected: step3. sync OK.
1965      */
1966     std::map<std::string, DBStatus> result;
1967     std::set<Key> condition;
1968     condition.insert({'k'});
1969     Query query = Query::Select().InKeys(condition);
1970     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
1971     EXPECT_EQ(status, OK);
1972     for (const auto &deviceId : devices) {
1973         EXPECT_EQ(result[deviceId], OK);
1974     }
1975 }
1976 
1977 /**
1978  * @tc.name: KVSyncOpt005
1979  * @tc.desc: check record ability finish after receive ability sync
1980  * @tc.type: FUNC
1981  * @tc.require:
1982  * @tc.author: zhangqiquan
1983  */
1984 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt005, TestSize.Level0)
1985 {
1986     /**
1987      * @tc.steps: step1. record packet which send to B
1988      */
1989     std::atomic<int> messageCount = 0;
1990     RegOnDispatchWithoutDataPacket(messageCount, true);
1991     /**
1992      * @tc.steps: step2. deviceB call sync and wait
1993      * @tc.expected: step2. sync should return OK.
1994      */
1995     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
1996     EXPECT_EQ(messageCount, 2); // DEV_A send negotiation 2 ack packet.
1997     /**
1998      * @tc.steps: step3. reopen kv store
1999      * @tc.expected: step3. reopen OK.
2000      */
2001     ReOpenDB();
2002     /**
2003      * @tc.steps: step4. reopen kv store and sync again
2004      * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2005      */
2006     messageCount = 0;
2007     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2008     EXPECT_EQ(messageCount, 0);
2009     g_communicatorAggregator->RegOnDispatch(nullptr);
2010 }
2011 
2012 /**
2013  * @tc.name: KVSyncOpt006
2014  * @tc.desc: check time sync and ability sync once after rebuild
2015  * @tc.type: FUNC
2016  * @tc.require:
2017  * @tc.author: zhangqiquan
2018  */
2019 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt006, TestSize.Level0)
2020 {
2021     /**
2022      * @tc.steps: step1. record packet which send to B
2023      */
2024     std::atomic<int> messageCount = 0;
2025     RegOnDispatchWithoutDataPacket(messageCount, true);
2026     /**
2027      * @tc.steps: step2. deviceA call sync and wait
2028      * @tc.expected: step2. sync should return OK.
2029      */
2030     std::vector<std::string> devices;
2031     devices.push_back(g_deviceB->GetDeviceId());
2032     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2033     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2034     /**
2035      * @tc.steps: step3. rebuild kv store
2036      * @tc.expected: step3. rebuild OK.
2037      */
2038     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
2039     g_kvDelegatePtr = nullptr;
2040     g_mgr.DeleteKvStore(STORE_ID);
2041     KvStoreNbDelegate::Option option;
2042     option.secOption.securityLabel = SecurityLabel::S3;
2043     option.secOption.securityFlag = SecurityFlag::SECE;
2044     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
2045     ASSERT_TRUE(g_kvDelegateStatus == OK);
2046     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2047     /**
2048      * @tc.steps: step4. rebuild kv store and sync again
2049      * @tc.expected: step4. rebuild OK and sync success, re ability sync.
2050      */
2051     messageCount = 0;
2052     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2053     EXPECT_EQ(messageCount, 1);
2054     g_communicatorAggregator->RegOnDispatch(nullptr);
2055 }
2056 
2057 /**
2058  * @tc.name: KVSyncOpt007
2059  * @tc.desc: check re ability sync after import
2060  * @tc.type: FUNC
2061  * @tc.require:
2062  * @tc.author: zhangqiquan
2063  */
2064 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt007, TestSize.Level0)
2065 {
2066     /**
2067      * @tc.steps: step1. record packet which send to B
2068      */
2069     std::atomic<int> messageCount = 0;
2070     RegOnDispatchWithoutDataPacket(messageCount, true);
2071     /**
2072      * @tc.steps: step2. deviceB call sync and wait
2073      * @tc.expected: step2. sync should return OK.
2074      */
2075     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2076     EXPECT_EQ(messageCount, 2); // DEV_A send negotiation 2 ack packet.
2077     /**
2078      * @tc.steps: step3. export and import
2079      * @tc.expected: step3. export and import OK.
2080      */
2081     std::string singleExportFileName = g_testDir + "/KVSyncOpt007.$$";
2082     CipherPassword passwd;
2083     EXPECT_EQ(g_kvDelegatePtr->Export(singleExportFileName, passwd), OK);
2084     EXPECT_EQ(g_kvDelegatePtr->Import(singleExportFileName, passwd), OK);
2085     /**
2086      * @tc.steps: step4. reopen kv store and sync again
2087      * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2088      */
2089     messageCount = 0;
2090     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2091     EXPECT_EQ(messageCount, 1); // DEV_A send negotiation 1 ack packet.
2092     g_communicatorAggregator->RegOnDispatch(nullptr);
2093 }
2094 
2095 /**
2096  * @tc.name: KVTimeChange001
2097  * @tc.desc: check time sync and ability sync once
2098  * @tc.type: FUNC
2099  * @tc.require:
2100  * @tc.author: zhangqiquan
2101  */
2102 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVTimeChange001, TestSize.Level0)
2103 {
2104     /**
2105      * @tc.steps: step1. record packet which send to B
2106      */
2107     std::atomic<int> messageCount = 0;
2108     RegOnDispatchWithoutDataPacket(messageCount);
2109     /**
2110      * @tc.steps: step2. deviceA call sync and wait
2111      * @tc.expected: step2. sync should return OK.
2112      */
2113     std::vector<std::string> devices;
2114     devices.push_back(g_deviceB->GetDeviceId());
2115     Sync(devices, OK);
2116     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2117     /**
2118      * @tc.steps: step3. sync again
2119      * @tc.expected: step3. sync success, no negotiation packet.
2120      */
2121     messageCount = 0;
2122     Sync(devices, OK);
2123     EXPECT_EQ(messageCount, 0);
2124     /**
2125      * @tc.steps: step4. modify time offset and sync again
2126      * @tc.expected: step4. sync success, only time sync packet.
2127      */
2128     RuntimeContext::GetInstance()->NotifyTimestampChanged(100);
2129     RuntimeContext::GetInstance()->RecordAllTimeChange();
2130     RuntimeContext::GetInstance()->ClearAllDeviceTimeInfo();
2131     messageCount = 0;
2132     Sync(devices, OK);
2133     EXPECT_EQ(messageCount, 1); // 1 contain time sync request packet
2134     messageCount = 0;
2135     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2136     EXPECT_EQ(messageCount, 0);
2137     g_communicatorAggregator->RegOnDispatch(nullptr);
2138 }
2139 }