• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 
18 #include "ability_sync.h"
19 #include "distributeddb_data_generate_unit_test.h"
20 #include "distributeddb_tools_unit_test.h"
21 #include "kv_store_nb_delegate_impl.h"
22 #include "kv_virtual_device.h"
23 #include "platform_specific.h"
24 #include "process_system_api_adapter_impl.h"
25 #include "single_ver_data_packet.h"
26 #include "virtual_communicator_aggregator.h"
27 
28 using namespace testing::ext;
29 using namespace DistributedDB;
30 using namespace DistributedDBUnitTest;
31 using namespace std;
32 
33 namespace {
34     string g_testDir;
35     const string STORE_ID = "kv_stroe_sync_check_test";
36     const std::string DEVICE_B = "deviceB";
37     const std::string DEVICE_C = "deviceC";
38     const int LOCAL_WATER_MARK_NOT_INIT = 0xaa;
39     const int EIGHT_HUNDRED = 800;
40     const int NORMAL_SYNC_SEND_REQUEST_CNT = 3;
41     const int TWO_CNT = 2;
42     const int SLEEP_MILLISECONDS = 500;
43     const int TEN_SECONDS = 10;
44     const int THREE_HUNDRED = 300;
45     const int WAIT_30_SECONDS = 30000;
46     const int WAIT_40_SECONDS = 40000;
47     const int TIMEOUT_6_SECONDS = 6000;
48 
49     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
50     KvStoreConfig g_config;
51     DistributedDBToolsUnitTest g_tool;
52     DBStatus g_kvDelegateStatus = INVALID_ARGS;
53     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
54     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
55     KvVirtualDevice* g_deviceB = nullptr;
56     KvVirtualDevice* g_deviceC = nullptr;
57     VirtualSingleVerSyncDBInterface *g_syncInterfaceB = nullptr;
58     VirtualSingleVerSyncDBInterface *g_syncInterfaceC = nullptr;
59 
60     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
61     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
62         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
63 #ifndef LOW_LEVEL_MEM_DEV
64     const int KEY_LEN = 20; // 20 Bytes
65     const int VALUE_LEN = 4 * 1024 * 1024; // 4MB
66     const int ENTRY_NUM = 2; // 16 entries
67 #endif
68 
69 class DistributedDBSingleVerP2PSyncCheckTest : public testing::Test {
70 public:
71     static void SetUpTestCase(void);
72     static void TearDownTestCase(void);
73     void SetUp();
74     void TearDown();
75     void CancelTestInit(DeviceSyncOption &option, std::vector<Entry> &entries, const uint32_t mtuSize);
76     void CancelTestEnd(std::vector<Entry> &entries, const uint32_t mtuSize);
77 };
78 
SetUpTestCase(void)79 void DistributedDBSingleVerP2PSyncCheckTest::SetUpTestCase(void)
80 {
81     /**
82      * @tc.setup: Init datadir and Virtual Communicator.
83      */
84     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
85     g_config.dataDir = g_testDir;
86     g_mgr.SetKvStoreConfig(g_config);
87 
88     string dir = g_testDir + "/single_ver";
89     DIR* dirTmp = opendir(dir.c_str());
90     if (dirTmp == nullptr) {
91         OS::MakeDBDirectory(dir);
92     } else {
93         closedir(dirTmp);
94     }
95 
96     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
97     ASSERT_TRUE(g_communicatorAggregator != nullptr);
98     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
99 
100     std::shared_ptr<ProcessSystemApiAdapterImpl> g_adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
101     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(g_adapter);
102 }
103 
TearDownTestCase(void)104 void DistributedDBSingleVerP2PSyncCheckTest::TearDownTestCase(void)
105 {
106     /**
107      * @tc.teardown: Release virtual Communicator and clear data dir.
108      */
109     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
110         LOGE("rm test db files error!");
111     }
112     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
113     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
114 }
115 
SetUp(void)116 void DistributedDBSingleVerP2PSyncCheckTest::SetUp(void)
117 {
118     DistributedDBToolsUnitTest::PrintTestCaseInfo();
119     /**
120      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
121      */
122     KvStoreNbDelegate::Option option;
123     option.secOption.securityLabel = SecurityLabel::S3;
124     option.secOption.securityFlag = SecurityFlag::SECE;
125     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
126     ASSERT_TRUE(g_kvDelegateStatus == OK);
127     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
128     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
129     ASSERT_TRUE(g_deviceB != nullptr);
130     g_syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
131     ASSERT_TRUE(g_syncInterfaceB != nullptr);
132     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, g_syncInterfaceB), E_OK);
133     SecurityOption virtualOption;
134     virtualOption.securityLabel = option.secOption.securityLabel;
135     virtualOption.securityFlag = option.secOption.securityFlag;
136     g_syncInterfaceB->SetSecurityOption(virtualOption);
137 
138     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
139     ASSERT_TRUE(g_deviceC != nullptr);
140     g_syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
141     ASSERT_TRUE(g_syncInterfaceC != nullptr);
142     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, g_syncInterfaceC), E_OK);
143     g_syncInterfaceC->SetSecurityOption(virtualOption);
144     RuntimeContext::GetInstance()->ClearAllDeviceTimeInfo();
145 }
146 
TearDown(void)147 void DistributedDBSingleVerP2PSyncCheckTest::TearDown(void)
148 {
149     /**
150      * @tc.teardown: Release device A, B, C
151      */
152     if (g_kvDelegatePtr != nullptr) {
153         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
154         g_kvDelegatePtr = nullptr;
155         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
156         LOGD("delete kv store status %d", status);
157         ASSERT_TRUE(status == OK);
158     }
159     if (g_deviceB != nullptr) {
160         delete g_deviceB;
161         g_deviceB = nullptr;
162     }
163     if (g_deviceC != nullptr) {
164         delete g_deviceC;
165         g_deviceC = nullptr;
166     }
167     if (g_communicatorAggregator != nullptr) {
168         g_communicatorAggregator->RegOnDispatch(nullptr);
169     }
170 }
171 
172 /**
173  * @tc.name: sec option check Sync 001
174  * @tc.desc: if sec option not equal, forbid sync
175  * @tc.type: FUNC
176  * @tc.require:
177  * @tc.author: wangchuanqing
178  */
179 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck001, TestSize.Level1)
180 {
181     DBStatus status = OK;
182     std::vector<std::string> devices;
183     devices.push_back(g_deviceB->GetDeviceId());
184     devices.push_back(g_deviceC->GetDeviceId());
185 
186     /**
187      * @tc.steps: step1. deviceA put {k1, v1}
188      */
189     Key key = {'1'};
190     Value value = {'1'};
191     status = g_kvDelegatePtr->Put(key, value);
192     ASSERT_TRUE(status == OK);
193 
194     ASSERT_TRUE(g_syncInterfaceB != nullptr);
195     ASSERT_TRUE(g_syncInterfaceC != nullptr);
196     SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
197     g_syncInterfaceB->SetSecurityOption(secOption);
198     g_syncInterfaceC->SetSecurityOption(secOption);
199 
200     /**
201      * @tc.steps: step2. deviceA call sync and wait
202      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
203      */
204     std::map<std::string, DBStatus> result;
205     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
206     ASSERT_TRUE(status == OK);
207 
208     ASSERT_TRUE(result.size() == devices.size());
209     for (const auto &pair : result) {
210         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
211         EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
212     }
213     VirtualDataItem item;
214     g_deviceB->GetData(key, item);
215     EXPECT_TRUE(item.value.empty());
216     g_deviceC->GetData(key, item);
217     EXPECT_TRUE(item.value.empty());
218 }
219 
220 /**
221  * @tc.name: sec option check Sync 002
222  * @tc.desc: if sec option not equal, forbid sync
223  * @tc.type: FUNC
224  * @tc.require:
225  * @tc.author: wangchuanqing
226  */
227 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck002, TestSize.Level1)
228 {
229     DBStatus status = OK;
230     std::vector<std::string> devices;
231     devices.push_back(g_deviceB->GetDeviceId());
232     devices.push_back(g_deviceC->GetDeviceId());
233 
234     /**
235      * @tc.steps: step1. deviceA put {k1, v1}
236      */
237     Key key = {'1'};
238     Value value = {'1'};
239     status = g_kvDelegatePtr->Put(key, value);
240     ASSERT_TRUE(status == OK);
241 
242     ASSERT_TRUE(g_syncInterfaceC != nullptr);
243     SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
244     g_syncInterfaceC->SetSecurityOption(secOption);
245     secOption.securityLabel = SecurityLabel::S3;
246     secOption.securityFlag = SecurityFlag::SECE;
247     g_syncInterfaceB->SetSecurityOption(secOption);
248 
249     /**
250      * @tc.steps: step2. deviceA call sync and wait
251      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
252      */
253     std::map<std::string, DBStatus> result;
254     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
255     ASSERT_TRUE(status == OK);
256 
257     ASSERT_TRUE(result.size() == devices.size());
258     for (const auto &pair : result) {
259         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
260         if (pair.first == DEVICE_B) {
261             EXPECT_TRUE(pair.second == OK);
262         } else {
263             EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
264         }
265     }
266     VirtualDataItem item;
267     g_deviceC->GetData(key, item);
268     EXPECT_TRUE(item.value.empty());
269     g_deviceB->GetData(key, item);
270     EXPECT_TRUE(item.value == value);
271 }
272 
273 /**
274  * @tc.name: sec option check Sync 003
275  * @tc.desc: if sec option equal, check not pass, forbid sync
276  * @tc.type: FUNC
277  * @tc.require:
278  * @tc.author: zhangqiquan
279  */
280 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck003, TestSize.Level1)
281 {
282     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
283     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anonde2bd9670202(const std::string &, const SecurityOption &) 284     adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
285         return false;
286     });
287     /**
288      * @tc.steps: step1. record packet
289      * @tc.expected: step1. sync should failed in source.
290      */
291     std::atomic<int> messageCount = 0;
__anonde2bd9670302(const std::string &, Message *) 292     g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &, Message *) {
293         messageCount++;
294     });
295     /**
296      * @tc.steps: step2. deviceA call sync and wait
297      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
298      */
299     DBStatus status = OK;
300     std::vector<std::string> devices;
301     devices.push_back(g_deviceB->GetDeviceId());
302     std::map<std::string, DBStatus> result;
303     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
304     EXPECT_EQ(status, OK);
305     EXPECT_EQ(messageCount, 4); // 4 = 2 time sync + 2 ability sync
306     for (const auto &pair : result) {
307         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
308         EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
309     }
310     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
311     g_communicatorAggregator->RegOnDispatch(nullptr);
312 }
313 
314 /**
315  * @tc.name: sec option check Sync 004
316  * @tc.desc: memory db not check device security
317  * @tc.type: FUNC
318  * @tc.require:
319  * @tc.author: zhangqiquan
320  */
321 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck004, TestSize.Level1)
322 {
323     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
324     g_kvDelegatePtr = nullptr;
325     KvStoreNbDelegate::Option option;
326     option.secOption.securityLabel = SecurityLabel::NOT_SET;
327     option.isMemoryDb = true;
328     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
329     ASSERT_TRUE(g_kvDelegateStatus == OK);
330     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
331 
332     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
333     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anonde2bd9670402(const std::string &, const SecurityOption &) 334     adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
335         return false;
336     });
__anonde2bd9670502(const std::string &, SecurityOption &securityOption) 337     adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
338         securityOption.securityLabel = NOT_SET;
339         return OK;
340     });
__anonde2bd9670602(SecurityOption &) 341     g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &) {
342         return -E_NOT_SUPPORT;
343     });
344 
345     std::vector<std::string> devices;
346     devices.push_back(g_deviceB->GetDeviceId());
347     std::map<std::string, DBStatus> result;
348     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
349     EXPECT_EQ(status, OK);
350     for (const auto &pair : result) {
351         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
352         EXPECT_TRUE(pair.second == OK);
353     }
354 
355     adapter->ForkCheckDeviceSecurityAbility(nullptr);
356     adapter->ForkGetSecurityOption(nullptr);
357     g_syncInterfaceB->ForkGetSecurityOption(nullptr);
358 }
359 
360 /**
361  * @tc.name: sec option check Sync 005
362  * @tc.desc: if sec option equal, check not pass, forbid sync
363  * @tc.type: FUNC
364  * @tc.require:
365  * @tc.author: zhangqiquan
366  */
367 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck005, TestSize.Level1)
368 {
369     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
370     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anonde2bd9670702(SecurityOption &option) 371     g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
372         option.securityLabel = NOT_SET;
373         return E_OK;
374     });
__anonde2bd9670802(const std::string &, SecurityOption &securityOption) 375     adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
376         securityOption.securityLabel = NOT_SET;
377         return OK;
378     });
379 
380     std::vector<std::string> devices;
381     devices.push_back(g_deviceB->GetDeviceId());
382     std::map<std::string, DBStatus> result;
383     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
384     EXPECT_EQ(status, OK);
385     for (const auto &pair : result) {
386         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
387         EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
388     }
389 
390     adapter->ForkCheckDeviceSecurityAbility(nullptr);
391     adapter->ForkGetSecurityOption(nullptr);
392     g_syncInterfaceB->ForkGetSecurityOption(nullptr);
393 }
394 
395 /**
396  * @tc.name: sec option check Sync 006
397  * @tc.desc: memory db not check device security
398  * @tc.type: FUNC
399  * @tc.require:
400  * @tc.author: zhangqiquan
401  */
402 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck006, TestSize.Level0)
403 {
404     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
405     ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID), OK);
406     g_kvDelegatePtr = nullptr;
407     KvStoreNbDelegate::Option option;
408     option.secOption.securityLabel = SecurityLabel::S1;
409     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
410     ASSERT_TRUE(g_kvDelegateStatus == OK);
411     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
412 
413     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
414     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anonde2bd9670902(const std::string &, const SecurityOption &) 415     adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
416         return true;
417     });
__anonde2bd9670a02(const std::string &, SecurityOption &securityOption) 418     adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
419         securityOption.securityLabel = S1;
420         return OK;
421     });
__anonde2bd9670b02(SecurityOption &option) 422     g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
423         option.securityLabel = SecurityLabel::S0;
424         return E_OK;
425     });
426 
427     std::vector<std::string> devices;
428     devices.push_back(g_deviceB->GetDeviceId());
429     std::map<std::string, DBStatus> result;
430     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
431     EXPECT_EQ(status, OK);
432     for (const auto &pair : result) {
433         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
434         EXPECT_TRUE(pair.second == OK);
435     }
436 
437     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(std::make_shared<ProcessSystemApiAdapterImpl>());
438     g_syncInterfaceB->ForkGetSecurityOption(nullptr);
439 }
440 
441 /**
442  * @tc.name: sec option check Sync 007
443  * @tc.desc: sync should send security option
444  * @tc.type: FUNC
445  * @tc.require:
446  * @tc.author: zhangqiquan
447  */
448 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck007, TestSize.Level0)
449 {
450     /**
451      * @tc.steps: step1. fork check device security ability
452      * @tc.expected: step1. check param option should be S3 SECE.
453      */
454     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
455     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anonde2bd9670c02(const std::string &, const SecurityOption &option) 456     adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &option) {
457         EXPECT_EQ(option.securityLabel, SecurityLabel::S3);
458         EXPECT_EQ(option.securityFlag, SecurityFlag::SECE);
459         return true;
460     });
461     /**
462      * @tc.steps: step2. sync twice
463      * @tc.expected: step2. sync success.
464      */
465     std::vector<std::string> devices;
466     devices.push_back(g_deviceB->GetDeviceId());
467     std::map<std::string, DBStatus> result;
468     g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
469     auto status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
470     ASSERT_TRUE(status == OK);
471     ASSERT_TRUE(result.size() == devices.size());
472     for (const auto &pair : result) {
473         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
474         EXPECT_TRUE(pair.second == OK);
475     }
476     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
477 }
478 
479 /**
480  * @tc.name: SecOptionCheck008
481  * @tc.desc: pull compress sync when check device ability fail
482  * @tc.type: FUNC
483  * @tc.require:
484  * @tc.author: zhangqiquan
485  */
486 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck008, TestSize.Level0)
487 {
488     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
489     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
490     auto deviceB = g_deviceB->GetDeviceId();
__anonde2bd9670d02(const std::string &dev, const SecurityOption &) 491     adapter->ForkCheckDeviceSecurityAbility([deviceB](const std::string &dev, const SecurityOption &) {
492         if (dev != "real_device") {
493             return true;
494         }
495         return false;
496     });
__anonde2bd9670e02(SecurityOption &option) 497     g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
498         option.securityLabel = SecurityLabel::S3;
499         option.securityFlag = SecurityFlag::SECE;
500         return E_OK;
501     });
502     g_syncInterfaceB->SetCompressSync(true);
503     std::vector<std::string> devices;
504     devices.push_back(deviceB);
505     std::map<std::string, DBStatus> result;
506     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
507     EXPECT_EQ(status, OK);
508     for (const auto &pair : result) {
509         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
510         EXPECT_EQ(pair.second, SECURITY_OPTION_CHECK_ERROR);
511     }
512 
513     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(std::make_shared<ProcessSystemApiAdapterImpl>());
514     g_syncInterfaceB->ForkGetSecurityOption(nullptr);
515     g_syncInterfaceB->SetCompressSync(false);
516 }
517 
518 /**
519  * @tc.name: SyncProcess001
520  * @tc.desc: sync process pull mode.
521  * @tc.type: FUNC
522  * @tc.require:
523  * @tc.author: chenghuitao
524  */
525 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncProcess001, TestSize.Level1)
526 {
527     std::vector<std::string> devices;
528     devices.push_back(g_deviceB->GetDeviceId());
529     devices.push_back(g_deviceC->GetDeviceId());
530 
531     /**
532      * @tc.steps: step1. deviceB deviceC put bigData
533      */
534     std::vector<Entry> entries;
535     const int dataCount = 10;
536     DistributedDBUnitTest::GenerateNumberEntryVector(dataCount, entries);
537 
538     for (uint32_t i = 0; i < entries.size(); i++) {
539         if (i % 2 == 0) {
540             g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
541         } else {
542             g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
543         }
544     }
545 
546     /**
547      * @tc.steps: step2. deviceA call pull sync
548      * @tc.expected: step2. sync should return OK.
549      */
550     std::map<std::string, DeviceSyncProcess> syncProcessMap;
551     DeviceSyncOption option;
552     option.devices = devices;
553     option.mode = SYNC_MODE_PULL_ONLY;
554     option.isQuery = false;
555     option.isWait = false;
556     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, option, syncProcessMap);
557     EXPECT_EQ(status, DBStatus::OK);
558 
559     /**
560      * @tc.expected: step3. onProcess should be called, DeviceA have all bigData
561      */
562     for (const auto &entry : entries) {
563         Value value;
564         EXPECT_EQ(g_kvDelegatePtr->Get(entry.key, value), DBStatus::OK);
565         EXPECT_EQ(value, entry.value);
566     }
567 
568     for (const auto &entry : syncProcessMap) {
569         LOGD("[SyncProcess001] dev %s, status %d, totalCount %u, finishedCount %u", entry.first.c_str(),
570             entry.second.errCode, entry.second.pullInfo.total, entry.second.pullInfo.finishedCount);
571         EXPECT_EQ(entry.second.errCode, OK);
572         EXPECT_EQ(entry.second.process, ProcessStatus::FINISHED);
573         EXPECT_EQ(entry.second.pullInfo.total, static_cast<uint32_t>(dataCount / 2));
574         EXPECT_EQ(entry.second.pullInfo.finishedCount, static_cast<uint32_t>(dataCount / 2));
575         ASSERT_TRUE(entry.second.syncId > 0);
576     }
577 }
578 
579 /**
580  * @tc.name: SyncProcess002
581  * @tc.desc: sync process pull mode.
582  * @tc.type: FUNC
583  * @tc.require:
584  * @tc.author: chenghuitao
585  */
586 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncProcess002, TestSize.Level1)
587 {
588     /**
589      * @tc.steps: step1. deviceA put bigData
590      */
591     std::vector<Entry> entries;
592     const int dataCount = 10;
593     DistributedDBUnitTest::GenerateNumberEntryVector(dataCount, entries);
594 
595     for (uint32_t i = 0; i < entries.size(); i++) {
596         g_kvDelegatePtr->Put(entries[i].key, entries[i].value);
597     }
598 
599     /**
600      * @tc.steps: step2. virtual deviceB call pull sync
601      * @tc.expected: step2. sync should return OK.
602      */
603     std::map<std::string, DeviceSyncProcess> syncProcessMap;
604     DeviceSyncOption option;
605     option.mode = SYNC_MODE_PULL_ONLY;
606     option.isQuery = false;
607     option.isWait = true;
608     uint32_t processCount = 0;
609     std::vector<ProcessStatus> statuses = {ProcessStatus::PREPARED, ProcessStatus::PROCESSING, ProcessStatus::FINISHED};
610     DeviceSyncProcessCallback onProcess =
__anonde2bd9670f02(const std::map<std::string, DeviceSyncProcess> &processMap) 611         [&](const std::map<std::string, DeviceSyncProcess> &processMap) {
612             syncProcessMap = processMap;
613             for (const auto &entry : processMap) {
614                 LOGD("[SyncProcess002-onProcess] dev %s, status %d, process %d", entry.first.c_str(),
615                     entry.second.errCode, entry.second.process);
616                 EXPECT_EQ(entry.second.errCode, DBStatus::OK);
617                 EXPECT_EQ(entry.second.process, statuses[processCount]);
618                 // total and finishedCount must be greater than 0 when processing
619                 if (entry.second.process == ProcessStatus::PROCESSING) {
620                     EXPECT_TRUE(entry.second.pullInfo.total > 0);
621                     EXPECT_TRUE(entry.second.pullInfo.finishedCount > 0);
622                 }
623             }
624             processCount++;
625         };
626     int status = g_deviceB->Sync(option, onProcess);
627     EXPECT_EQ(status, E_OK);
628 
629     /**
630      * @tc.expected: step3. onProcess should be called, DeviceB have all bigData
631      */
632     for (const auto &entry : entries) {
633         VirtualDataItem item;
634         EXPECT_EQ(g_deviceB->GetData(entry.key, item), E_OK);
635         EXPECT_EQ(item.value, entry.value);
636     }
637 
638     for (const auto &entry : syncProcessMap) {
639         LOGD("[SyncProcess002] dev %s, status %d, totalCount %u, finishedCount %u", entry.first.c_str(),
640             entry.second.errCode, entry.second.pullInfo.total, entry.second.pullInfo.finishedCount);
641         EXPECT_EQ(entry.second.errCode, DBStatus::OK);
642         EXPECT_EQ(entry.second.process, ProcessStatus::FINISHED);
643         EXPECT_EQ(entry.second.pullInfo.total, static_cast<uint32_t>(dataCount));
644         EXPECT_EQ(entry.second.pullInfo.finishedCount, static_cast<uint32_t>(dataCount));
645         ASSERT_TRUE(entry.second.syncId > 0);
646     }
647 }
648 
649 /**
650  * @tc.name: SyncProcess003
651  * @tc.desc: sync process pull mode with QUERY.
652  * @tc.type: FUNC
653  * @tc.require:
654  * @tc.author: chenghuitao
655  */
656 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncProcess003, TestSize.Level1)
657 {
658     /**
659      * @tc.steps: step1. deviceA put bigData
660      */
661     std::vector<Entry> entries;
662     const int dataCount = 10;
663     DistributedDBUnitTest::GenerateNumberEntryVector(dataCount, entries);
664 
665     for (uint32_t i = 0; i < entries.size(); i++) {
666         g_kvDelegatePtr->Put(entries[i].key, entries[i].value);
667     }
668 
669     /**
670      * @tc.steps: step2. virtual deviceB call pull sync
671      * @tc.expected: step2. sync should return OK.
672      */
673     std::map<std::string, DeviceSyncProcess> syncProcessMap;
674     DeviceSyncOption option;
675     option.mode = SYNC_MODE_PULL_ONLY;
676     option.isQuery = true;
677     option.isWait = true;
678     option.query = Query::Select().Limit(5);
679     DeviceSyncProcessCallback onProcess =
__anonde2bd9671002(const std::map<std::string, DeviceSyncProcess> &processMap) 680         [&syncProcessMap, this](const std::map<std::string, DeviceSyncProcess> &processMap) {
681             syncProcessMap = processMap;
682         };
683     int status = g_deviceB->Sync(option, onProcess);
684     EXPECT_EQ(status, E_OK);
685 
686     /**
687      * @tc.expected: step3. onProcess should be called, DeviceB have all bigData
688      */
689     for (const auto &entry : std::vector<Entry>(entries.begin(), entries.begin() + 5)) {
690         VirtualDataItem item;
691         EXPECT_EQ(g_deviceB->GetData(entry.key, item), E_OK);
692         EXPECT_EQ(item.value, entry.value);
693     }
694 
695     for (const auto &entry : syncProcessMap) {
696         LOGD("[SyncProcess003] dev %s, status %d, totalCount %u, finishedCount %u", entry.first.c_str(),
697             entry.second.errCode, entry.second.pullInfo.total, entry.second.pullInfo.finishedCount);
698         EXPECT_EQ(entry.second.errCode, DBStatus::OK);
699         EXPECT_EQ(entry.second.process, ProcessStatus::FINISHED);
700         EXPECT_EQ(entry.second.pullInfo.total, 5u);
701         EXPECT_EQ(entry.second.pullInfo.finishedCount, 5u);
702         ASSERT_TRUE(entry.second.syncId > 0);
703     }
704 }
705 
706 #ifndef LOW_LEVEL_MEM_DEV
707 /**
708  * @tc.name: BigDataSync001
709  * @tc.desc: big data sync push mode.
710  * @tc.type: FUNC
711  * @tc.require:
712  * @tc.author: wangchuanqing
713  */
714 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync001, TestSize.Level1)
715 {
716     DBStatus status = OK;
717     std::vector<std::string> devices;
718     devices.push_back(g_deviceB->GetDeviceId());
719     devices.push_back(g_deviceC->GetDeviceId());
720 
721     /**
722      * @tc.steps: step1. deviceA put 16 bigData
723      */
724     std::vector<Entry> entries;
725     std::vector<Key> keys;
726     DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
727     for (const auto &entry : entries) {
728         status = g_kvDelegatePtr->Put(entry.key, entry.value);
729         ASSERT_TRUE(status == OK);
730     }
731 
732     /**
733      * @tc.steps: step2. deviceA call sync and wait
734      * @tc.expected: step2. sync should return OK.
735      */
736     std::map<std::string, DBStatus> result;
737     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
738     ASSERT_TRUE(status == OK);
739 
740     /**
741      * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
742      */
743     ASSERT_TRUE(result.size() == devices.size());
744     for (const auto &pair : result) {
745         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
746         EXPECT_TRUE(pair.second == OK);
747     }
748     VirtualDataItem item;
749     for (const auto &entry : entries) {
750         item.value.clear();
751         g_deviceB->GetData(entry.key, item);
752         EXPECT_TRUE(item.value == entry.value);
753         item.value.clear();
754         g_deviceC->GetData(entry.key, item);
755         EXPECT_TRUE(item.value == entry.value);
756     }
757 }
758 
759 /**
760  * @tc.name: BigDataSync002
761  * @tc.desc: big data sync pull mode.
762  * @tc.type: FUNC
763  * @tc.require:
764  * @tc.author: wangchuanqing
765  */
766 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync002, TestSize.Level1)
767 {
768     DBStatus status = OK;
769     std::vector<std::string> devices;
770     devices.push_back(g_deviceB->GetDeviceId());
771     devices.push_back(g_deviceC->GetDeviceId());
772 
773     /**
774      * @tc.steps: step1. deviceA deviceB put bigData
775      */
776     std::vector<Entry> entries;
777     std::vector<Key> keys;
778     DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
779 
780     for (uint32_t i = 0; i < entries.size(); i++) {
781         if (i % 2 == 0) {
782             g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
783         } else {
784             g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
785         }
786     }
787 
788     /**
789      * @tc.steps: step3. deviceA call pull sync
790      * @tc.expected: step3. sync should return OK.
791      */
792     std::map<std::string, DBStatus> result;
793     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
794     ASSERT_TRUE(status == OK);
795 
796     /**
797      * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
798      */
799     ASSERT_TRUE(result.size() == devices.size());
800     for (const auto &pair : result) {
801         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
802         EXPECT_TRUE(pair.second == OK);
803     }
804     for (const auto &entry : entries) {
805         Value value;
806         EXPECT_EQ(g_kvDelegatePtr->Get(entry.key, value), OK);
807         EXPECT_EQ(value, entry.value);
808     }
809 }
810 
811 /**
812  * @tc.name: BigDataSync003
813  * @tc.desc: big data sync pushAndPull mode.
814  * @tc.type: FUNC
815  * @tc.require:
816  * @tc.author: wangchuanqing
817  */
818 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync003, TestSize.Level1)
819 {
820     DBStatus status = OK;
821     std::vector<std::string> devices;
822     devices.push_back(g_deviceB->GetDeviceId());
823     devices.push_back(g_deviceC->GetDeviceId());
824 
825     /**
826      * @tc.steps: step1. deviceA deviceB put bigData
827      */
828     std::vector<Entry> entries;
829     std::vector<Key> keys;
830     DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
831 
832     for (uint32_t i = 0; i < entries.size(); i++) {
833         if (i % 3 == 0) { // 0 3 6 9 12 15 for deivec B
834             g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
835         } else if (i % 3 == 1) { // 1 4 7 10 13 16 for device C
836             g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
837         } else { // 2 5 8 11 14 for device A
838             status = g_kvDelegatePtr->Put(entries[i].key, entries[i].value);
839             ASSERT_TRUE(status == OK);
840         }
841     }
842 
843     /**
844      * @tc.steps: step3. deviceA call pushAndpull sync
845      * @tc.expected: step3. sync should return OK.
846      */
847     std::map<std::string, DBStatus> result;
848     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
849     ASSERT_TRUE(status == OK);
850 
851     /**
852      * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
853      * deviceB and deviceC has deviceA data
854      */
855     ASSERT_TRUE(result.size() == devices.size());
856     for (const auto &pair : result) {
857         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
858         EXPECT_TRUE(pair.second == OK);
859     }
860 
861     VirtualDataItem item;
862     for (uint32_t i = 0; i < entries.size(); i++) {
863         Value value;
864         EXPECT_EQ(g_kvDelegatePtr->Get(entries[i].key, value), OK);
865         EXPECT_EQ(value, entries[i].value);
866 
867         if (i % 3 == 2) { // 2 5 8 11 14 for device A
868         item.value.clear();
869         g_deviceB->GetData(entries[i].key, item);
870         EXPECT_TRUE(item.value == entries[i].value);
871         item.value.clear();
872         g_deviceC->GetData(entries[i].key, item);
873         EXPECT_TRUE(item.value == entries[i].value);
874         }
875     }
876 }
877 #endif
878 
CancelTestInit(DeviceSyncOption & option,std::vector<Entry> & entries,const uint32_t mtuSize)879 void DistributedDBSingleVerP2PSyncCheckTest::CancelTestInit(DeviceSyncOption &option, std::vector<Entry> &entries,
880     const uint32_t mtuSize)
881 {
882     option.devices.push_back(g_deviceB->GetDeviceId());
883     option.devices.push_back(g_deviceC->GetDeviceId());
884     option.mode = SYNC_MODE_PULL_ONLY;
885     option.isQuery = false;
886     option.isWait = false;
887 
888     std::vector<Key> keys;
889     const uint32_t entriesSize = 14000u;
890     const int keySize = 20;
891     DistributedDBUnitTest::GenerateRecords(entriesSize, entries, keys, keySize, mtuSize);
892     for (uint32_t i = 0; i < entries.size(); i++) {
893         if (i % option.devices.size() == 0) {
894             g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
895         } else {
896             g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
897         }
898     }
899 
900     g_communicatorAggregator->SetDeviceMtuSize("real_device", mtuSize);
901     g_communicatorAggregator->SetDeviceMtuSize(DEVICE_C, mtuSize);
902     g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, mtuSize);
903 }
904 
CancelTestEnd(std::vector<Entry> & entries,const uint32_t mtuSize)905 void DistributedDBSingleVerP2PSyncCheckTest::CancelTestEnd(std::vector<Entry> &entries, const uint32_t mtuSize)
906 {
907     size_t syncSuccCount = 0;
908     for (uint32_t i = 0; i < entries.size(); i++) {
909         Value value;
910         if (g_kvDelegatePtr->Get(entries[i].key, value) == OK) {
911             syncSuccCount++;
912             EXPECT_EQ(value, entries[i].value);
913         }
914     }
915     EXPECT_GT(syncSuccCount, static_cast<size_t>(0));
916     EXPECT_LT(syncSuccCount, entries.size());
917     uint32_t mtu = 5u;
918     g_communicatorAggregator->SetDeviceMtuSize("real_device", mtu * mtuSize * mtuSize);
919     g_communicatorAggregator->SetDeviceMtuSize(DEVICE_C, mtu * mtuSize * mtuSize);
920     g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, mtu * mtuSize * mtuSize);
921     g_communicatorAggregator->RegBeforeDispatch(nullptr);
922 }
923 
924 /**
925  * @tc.name: CancelSyncProcess001
926  * @tc.desc: cancel data sync process pull mode.
927  * @tc.type: FUNC
928  * @tc.require:
929  * @tc.author: lijun
930  */
931 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncProcessCancel001, TestSize.Level0)
932 {
933     DeviceSyncOption option;
934     std::vector<Entry> entries;
935     const uint32_t mtuSize = 8u;
936     /**
937      * @tc.steps: step1. deviceB deviceC put data
938     */
939     CancelTestInit(option, entries, mtuSize);
940     uint32_t syncId;
941     std::mutex tempMutex;
942     bool isFirst = true;
__anonde2bd9671102(const std::string &dstTarget, const Message *msg) 943     g_communicatorAggregator->RegBeforeDispatch([&](const std::string &dstTarget, const Message *msg) {
944         if (dstTarget == "real_device" && msg->GetMessageType() == TYPE_REQUEST &&
945             msg->GetMessageId() == DATA_SYNC_MESSAGE) {
946             tempMutex.lock();
947             if (isFirst == true) {
948                 isFirst = false;
949                 /**
950                 * @tc.steps: step3. cancel sync
951                 * @tc.expected: step3. should return OK.
952                 */
953                 ASSERT_TRUE(g_kvDelegatePtr->CancelSync(syncId) == OK);
954                 tempMutex.unlock();
955                 std::this_thread::sleep_for(std::chrono::seconds(1));
956                 return;
957             }
958             tempMutex.unlock();
959         }
960     });
961 
962     std::mutex cancelMtx;
963     std::condition_variable cancelCv;
964     bool cancalFinished = false;
965 
__anonde2bd9671202(const std::map<std::string, DeviceSyncProcess> &processMap) 966     DeviceSyncProcessCallback onProcess = [&](const std::map<std::string, DeviceSyncProcess> &processMap) {
967         bool isAllCancel = true;
968         for (auto &process: processMap) {
969             syncId = process.second.syncId;
970             if (process.second.errCode != COMM_FAILURE) {
971                 isAllCancel = false;
972             }
973         }
974         if (isAllCancel) {
975             std::unique_lock<std::mutex> lock(cancelMtx);
976             cancalFinished = true;
977             cancelCv.notify_all();
978         }
979     };
980     /**
981      * @tc.steps: step2. deviceA call pull sync
982      * @tc.expected: step2. sync should return OK.
983      */
984     ASSERT_TRUE(g_kvDelegatePtr->Sync(option, onProcess) == OK);
985 
986     // Wait onProcess complete.
987     {
988         std::unique_lock<std::mutex> lock2(cancelMtx);
__anonde2bd9671302() 989         cancelCv.wait(lock2, [&cancalFinished]() {return cancalFinished;});
990     }
991     // Wait until all the packets arrive.
992     std::this_thread::sleep_for(std::chrono::seconds(2));
993 
994     /**
995      * @tc.steps: step4. Cancel abnormal syncId.
996      * @tc.expected: step4. return NOT_FOUND.
997      */
998     ASSERT_TRUE(g_kvDelegatePtr->CancelSync(syncId) == NOT_FOUND);
999     ASSERT_TRUE(g_kvDelegatePtr->CancelSync(0) == NOT_FOUND);
1000     ASSERT_TRUE(g_kvDelegatePtr->CancelSync(4294967295) == NOT_FOUND); // uint32_t max value 4294967295
1001     CancelTestEnd(entries, mtuSize);
1002 }
1003 
1004 /**
1005  * @tc.name: PushFinishedNotify 001
1006  * @tc.desc: Test remote device push finished notify function.
1007  * @tc.type: FUNC
1008  * @tc.require:
1009  * @tc.author: xushaohua
1010  */
1011 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, PushFinishedNotify001, TestSize.Level1)
1012 {
1013     std::vector<std::string> devices;
1014     devices.push_back(g_deviceB->GetDeviceId());
1015 
1016     /**
1017      * @tc.steps: step1. deviceA call SetRemotePushFinishedNotify
1018      * @tc.expected: step1. set should return OK.
1019      */
1020     int pushfinishedFlag = 0;
1021     DBStatus status = g_kvDelegatePtr->SetRemotePushFinishedNotify(
__anonde2bd9671402(const RemotePushNotifyInfo &info) 1022         [&pushfinishedFlag](const RemotePushNotifyInfo &info) {
1023             EXPECT_TRUE(info.deviceId == DEVICE_B);
1024             pushfinishedFlag = 1;
1025     });
1026     ASSERT_EQ(status, OK);
1027 
1028     /**
1029      * @tc.steps: step2. deviceB put k2, v2, and deviceA pull from deviceB
1030      * @tc.expected: step2. deviceA can not receive push finished notify
1031      */
1032     EXPECT_EQ(g_kvDelegatePtr->Put(KEY_2, VALUE_2), OK);
1033     std::map<std::string, DBStatus> result;
1034     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
1035     EXPECT_TRUE(status == OK);
1036     EXPECT_EQ(pushfinishedFlag, 0);
1037     pushfinishedFlag = 0;
1038 
1039     /**
1040      * @tc.steps: step3. deviceB put k3, v3, and deviceA push and pull to deviceB
1041      * @tc.expected: step3. deviceA can not receive push finished notify
1042      */
1043     EXPECT_EQ(g_kvDelegatePtr->Put(KEY_3, VALUE_3), OK);
1044     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
1045     EXPECT_TRUE(status == OK);
1046     EXPECT_EQ(pushfinishedFlag, 0);
1047     pushfinishedFlag = 0;
1048 
1049     /**
1050      * @tc.steps: step4. deviceA call SetRemotePushFinishedNotify to reset notify
1051      * @tc.expected: step4. set should return OK.
1052      */
__anonde2bd9671502(const RemotePushNotifyInfo &info) 1053     status = g_kvDelegatePtr->SetRemotePushFinishedNotify([&pushfinishedFlag](const RemotePushNotifyInfo &info) {
1054         EXPECT_TRUE(info.deviceId == DEVICE_B);
1055         pushfinishedFlag = 2;
1056     });
1057     ASSERT_EQ(status, OK);
1058 
1059     /**
1060      * @tc.steps: step5. deviceA call SetRemotePushFinishedNotify set null to unregist
1061      * @tc.expected: step5. set should return OK.
1062      */
1063     status = g_kvDelegatePtr->SetRemotePushFinishedNotify(nullptr);
1064     ASSERT_EQ(status, OK);
1065 }
1066 
1067 namespace {
RegOnDispatchWithDelayAck(bool & errCodeAck,bool & afterErrAck)1068 void RegOnDispatchWithDelayAck(bool &errCodeAck, bool &afterErrAck)
1069 {
1070     // just delay the busy ack
1071     g_communicatorAggregator->RegOnDispatch([&errCodeAck, &afterErrAck](const std::string &dev, Message *inMsg) {
1072         if (dev != g_deviceB->GetDeviceId()) {
1073             return;
1074         }
1075         auto *packet = inMsg->GetObject<DataAckPacket>();
1076         if (packet != nullptr && packet->GetRecvCode() == -E_BUSY) {
1077             errCodeAck = true;
1078             while (!afterErrAck) {
1079             }
1080             LOGW("NOW SEND BUSY ACK");
1081         } else if (errCodeAck) {
1082             afterErrAck = true;
1083             std::this_thread::sleep_for(std::chrono::seconds(1));
1084         }
1085     });
1086 }
1087 
RegOnDispatchWithOffline(bool & offlineFlag,bool & invalid,condition_variable & conditionOffline)1088 void RegOnDispatchWithOffline(bool &offlineFlag, bool &invalid, condition_variable &conditionOffline)
1089 {
1090     g_communicatorAggregator->RegOnDispatch([&offlineFlag, &invalid, &conditionOffline](
1091                                                 const std::string &dev, Message *inMsg) {
1092         auto *packet = inMsg->GetObject<DataAckPacket>();
1093         if (dev != DEVICE_B) {
1094             if (packet != nullptr && (packet->GetRecvCode() == LOCAL_WATER_MARK_NOT_INIT)) {
1095                 offlineFlag = true;
1096                 conditionOffline.notify_all();
1097                 LOGW("[Dispatch] NOTIFY OFFLINE");
1098                 std::this_thread::sleep_for(std::chrono::microseconds(EIGHT_HUNDRED));
1099             }
1100         } else if (!invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
1101             LOGW("[Dispatch] NOW INVALID THIS MSG");
1102             inMsg->SetMessageType(TYPE_INVALID);
1103             inMsg->SetMessageId(INVALID_MESSAGE_ID);
1104             invalid = true;
1105         }
1106     });
1107 }
1108 
RegOnDispatchWithInvalidMsg(bool & invalid)1109 void RegOnDispatchWithInvalidMsg(bool &invalid)
1110 {
1111     g_communicatorAggregator->RegOnDispatch([&invalid](
1112         const std::string &dev, Message *inMsg) {
1113         if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
1114             LOGW("[Dispatch] NOW INVALID THIS MSG");
1115             inMsg->SetMessageType(TYPE_INVALID);
1116             inMsg->SetMessageId(INVALID_MESSAGE_ID);
1117             invalid = true;
1118         }
1119     });
1120 }
1121 
PrepareEnv(vector<std::string> & devices,Key & key,Query & query)1122 void PrepareEnv(vector<std::string> &devices, Key &key, Query &query)
1123 {
1124     /**
1125      * @tc.steps: step1. ensure the watermark is no zero and finish timeSync and abilitySync
1126      * @tc.expected: step1. should return OK.
1127      */
1128     Value value = {'1'};
1129     std::map<std::string, DBStatus> result;
1130     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1131 
1132     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
1133     EXPECT_TRUE(status == OK);
1134     ASSERT_TRUE(result[g_deviceB->GetDeviceId()] == OK);
1135 }
1136 
Sync(KvStoreNbDelegate * kvDelegatePtr,vector<std::string> & devices,const DBStatus & targetStatus)1137 void Sync(KvStoreNbDelegate *kvDelegatePtr, vector<std::string> &devices, const DBStatus &targetStatus)
1138 {
1139     std::map<std::string, DBStatus> result;
1140     DBStatus status = g_tool.SyncTest(kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result);
1141     EXPECT_TRUE(status == OK);
1142     for (const auto &deviceId : devices) {
1143         ASSERT_TRUE(result[deviceId] == targetStatus);
1144     }
1145 }
1146 
Sync(vector<std::string> & devices,const DBStatus & targetStatus)1147 void Sync(vector<std::string> &devices, const DBStatus &targetStatus)
1148 {
1149     Sync(g_kvDelegatePtr, devices, targetStatus);
1150 }
1151 
SyncWithQuery(vector<std::string> & devices,const Query & query,const SyncMode & mode,const DBStatus & targetStatus)1152 void SyncWithQuery(vector<std::string> &devices, const Query &query, const SyncMode &mode,
1153     const DBStatus &targetStatus)
1154 {
1155     std::map<std::string, DBStatus> result;
1156     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result, query);
1157     EXPECT_TRUE(status == OK);
1158     for (const auto &deviceId : devices) {
1159         if (targetStatus == COMM_FAILURE) {
1160             // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
1161             // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
1162             // The returned status is COMM_FAILURE.
1163             // If syncTaskContext of deviceB is not executed first, the error code is transparently transmitted.
1164             EXPECT_TRUE((result[deviceId] == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
1165                 (result[deviceId] == COMM_FAILURE));
1166         } else {
1167             ASSERT_EQ(result[deviceId], targetStatus);
1168         }
1169     }
1170 }
1171 
SyncWithQuery(vector<std::string> & devices,const Query & query,const DBStatus & targetStatus)1172 void SyncWithQuery(vector<std::string> &devices, const Query &query, const DBStatus &targetStatus)
1173 {
1174     SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PUSH_ONLY, targetStatus);
1175 }
1176 
SyncWithDeviceOffline(vector<std::string> & devices,Key & key,const Query & query)1177 void SyncWithDeviceOffline(vector<std::string> &devices, Key &key, const Query &query)
1178 {
1179     Value value = {'2'};
1180     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1181 
1182     /**
1183      * @tc.steps: step2. invalid the sync msg
1184      * @tc.expected: step2. should return TIME_OUT.
1185      */
1186     SyncWithQuery(devices, query, TIME_OUT);
1187 
1188     /**
1189      * @tc.steps: step3. device offline when sync
1190      * @tc.expected: step3. should return COMM_FAILURE.
1191      */
1192     SyncWithQuery(devices, query, COMM_FAILURE);
1193 }
1194 
PrepareWaterMarkError(std::vector<std::string> & devices,Query & query)1195 void PrepareWaterMarkError(std::vector<std::string> &devices, Query &query)
1196 {
1197     /**
1198      * @tc.steps: step1. prepare data
1199      */
1200     devices.push_back(g_deviceB->GetDeviceId());
1201     g_deviceB->Online();
1202 
1203     Key key = {'1'};
1204     query = Query::Select().PrefixKey(key);
1205     PrepareEnv(devices, key, query);
1206 
1207     /**
1208      * @tc.steps: step2. query sync and set queryWaterMark
1209      * @tc.expected: step2. should return OK.
1210      */
1211     Value value = {'2'};
1212     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1213     SyncWithQuery(devices, query, OK);
1214 
1215     /**
1216      * @tc.steps: step3. sync and invalid msg for set local device waterMark
1217      * @tc.expected: step3. should return TIME_OUT.
1218      */
1219     bool invalidMsg = false;
1220     RegOnDispatchWithInvalidMsg(invalidMsg);
1221     value = {'3'};
1222     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1223     Sync(devices, TIME_OUT);
1224     g_communicatorAggregator->RegOnDispatch(nullptr);
1225 }
1226 
RegOnDispatchWithoutDataPacket(std::atomic<int> & messageCount,bool calResponse=false)1227 void RegOnDispatchWithoutDataPacket(std::atomic<int> &messageCount, bool calResponse = false)
1228 {
1229     g_communicatorAggregator->RegOnDispatch([calResponse, &messageCount](const std::string &dev, Message *msg) {
1230         if (msg->GetMessageId() != TIME_SYNC_MESSAGE && msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
1231             return;
1232         }
1233         if (dev != DEVICE_B || (!calResponse && msg->GetMessageType() != TYPE_REQUEST)) {
1234             return;
1235         }
1236         messageCount++;
1237     });
1238 }
1239 
ReOpenDB()1240 void ReOpenDB()
1241 {
1242     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1243     g_kvDelegatePtr = nullptr;
1244     KvStoreNbDelegate::Option option;
1245     option.secOption.securityLabel = SecurityLabel::S3;
1246     option.secOption.securityFlag = SecurityFlag::SECE;
1247     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1248     ASSERT_TRUE(g_kvDelegateStatus == OK);
1249     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1250 }
1251 }
1252 
1253 /**
1254  * @tc.name: AckSessionCheck 001
1255  * @tc.desc: Test ack session check function.
1256  * @tc.type: FUNC
1257  * @tc.require:
1258  * @tc.author: zhangqiquan
1259  */
1260 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSessionCheck001, TestSize.Level3)
1261 {
1262     std::vector<std::string> devices;
1263     devices.push_back(g_deviceB->GetDeviceId());
1264 
1265     /**
1266      * @tc.steps: step1. deviceB sync to deviceA just for timeSync and abilitySync
1267      * @tc.expected: step1. should return OK.
1268      */
1269     ASSERT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == E_OK);
1270 
1271     /**
1272      * @tc.steps: step2. deviceA StartTransaction for prevent other sync action deviceB sync will fail
1273      * @tc.expected: step2. should return OK.
1274      */
1275     ASSERT_TRUE(g_kvDelegatePtr->StartTransaction() == OK);
1276 
1277     bool errCodeAck = false;
1278     bool afterErrAck = false;
1279     RegOnDispatchWithDelayAck(errCodeAck, afterErrAck);
1280 
1281     Key key = {'1'};
1282     Value value = {'1'};
1283     Timestamp currentTime;
1284     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1285     EXPECT_TRUE(g_deviceB->PutData(key, value, currentTime, 0) == E_OK);
1286     EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == E_OK);
1287 
1288     Value outValue;
1289     EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == NOT_FOUND);
1290 
1291     /**
1292      * @tc.steps: step3. release the writeHandle and try again, sync success
1293      * @tc.expected: step3. should return OK.
1294      */
1295     EXPECT_TRUE(g_kvDelegatePtr->Commit() == OK);
1296     EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == E_OK);
1297 
1298     EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == OK);
1299     EXPECT_EQ(outValue, value);
1300 }
1301 
1302 /**
1303  * @tc.name: AckSafeCheck001
1304  * @tc.desc: Test ack session check filter all bad ack in device offline scene.
1305  * @tc.type: FUNC
1306  * @tc.require:
1307  * @tc.author: zhangqiquan
1308  */
1309 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSafeCheck001, TestSize.Level3)
1310 {
1311     std::vector<std::string> devices;
1312     devices.push_back(g_deviceB->GetDeviceId());
1313     g_deviceB->Online();
1314 
1315     Key key = {'1'};
1316     Query query = Query::Select().PrefixKey(key);
1317     PrepareEnv(devices, key, query);
1318 
1319     std::condition_variable conditionOnline;
1320     std::condition_variable conditionOffline;
1321     bool onlineFlag = false;
1322     bool invalid = false;
1323     bool offlineFlag = false;
__anonde2bd9671b02() 1324     thread subThread([&onlineFlag, &conditionOnline, &offlineFlag, &conditionOffline]() {
1325         LOGW("[Dispatch] NOW DEVICES IS OFFLINE");
1326         std::mutex offlineMtx;
1327         std::unique_lock<std::mutex> lck(offlineMtx);
1328         conditionOffline.wait(lck, [&offlineFlag]{ return offlineFlag; });
1329         g_deviceB->Offline();
1330         std::this_thread::sleep_for(std::chrono::seconds(1));
1331         g_deviceB->Online();
1332         onlineFlag = true;
1333         conditionOnline.notify_all();
1334         LOGW("[Dispatch] NOW DEVICES IS ONLINE");
1335     });
1336     subThread.detach();
1337 
1338     RegOnDispatchWithOffline(offlineFlag, invalid, conditionOffline);
1339 
1340     SyncWithDeviceOffline(devices, key, query);
1341 
1342     std::mutex onlineMtx;
1343     std::unique_lock<std::mutex> lck(onlineMtx);
__anonde2bd9671d02null1344     conditionOnline.wait(lck, [&onlineFlag]{ return onlineFlag; });
1345 
1346     /**
1347      * @tc.steps: step4. sync again if has problem it will sync never end
1348      * @tc.expected: step4. should return OK.
1349      */
1350     SyncWithQuery(devices, query, OK);
1351 }
1352 
1353 /**
1354  * @tc.name: WaterMarkCheck001
1355  * @tc.desc: Test waterMark work correct in lost package scene.
1356  * @tc.type: FUNC
1357  * @tc.require:
1358  * @tc.author: zhangqiquan
1359  */
1360 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck001, TestSize.Level1)
1361 {
1362     std::vector<std::string> devices;
1363     Query query = Query::Select();
1364     PrepareWaterMarkError(devices, query);
1365 
1366     /**
1367      * @tc.steps: step4. sync again see it work correct
1368      * @tc.expected: step4. should return OK.
1369      */
1370     SyncWithQuery(devices, query, OK);
1371 }
1372 
1373 /**
1374  * @tc.name: WaterMarkCheck002
1375  * @tc.desc: Test pull work correct in error waterMark scene.
1376  * @tc.type: FUNC
1377  * @tc.require:
1378  * @tc.author: zhangqiquan
1379  */
1380 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck002, TestSize.Level1)
1381 {
1382     std::vector<std::string> devices;
1383     Query query = Query::Select();
1384     PrepareWaterMarkError(devices, query);
1385 
1386     /**
1387      * @tc.steps: step4. sync again see it work correct
1388      * @tc.expected: step4. should return OK.
1389      */
1390     Key key = {'2'};
1391     ASSERT_TRUE(g_kvDelegatePtr->Put(key, {}) == OK);
1392     query = Query::Select();
1393     SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PULL_ONLY, OK);
1394 
1395     VirtualDataItem item;
1396     EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
1397 }
1398 
RegOnDispatchToGetSyncCount(int & sendRequestCount,int sleepMs=0)1399 void RegOnDispatchToGetSyncCount(int &sendRequestCount, int sleepMs = 0)
1400 {
1401     g_communicatorAggregator->RegOnDispatch([sleepMs, &sendRequestCount](
1402             const std::string &dev, Message *inMsg) {
1403         if (dev == DEVICE_B && inMsg->GetMessageType() == TYPE_REQUEST) {
1404             std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1405             sendRequestCount++;
1406             LOGD("sendRequestCount++...");
1407         }
1408     });
1409 }
1410 
TestDifferentSyncMode(SyncMode mode)1411 void TestDifferentSyncMode(SyncMode mode)
1412 {
1413     std::vector<std::string> devices;
1414     devices.push_back(g_deviceB->GetDeviceId());
1415 
1416     /**
1417      * @tc.steps: step1. deviceA put {k1, v1}
1418      */
1419     Key key = {'1'};
1420     Value value = {'1'};
1421     DBStatus status = g_kvDelegatePtr->Put(key, value);
1422     ASSERT_TRUE(status == OK);
1423 
1424     int sendRequestCount = 0;
1425     RegOnDispatchToGetSyncCount(sendRequestCount);
1426 
1427     /**
1428      * @tc.steps: step2. deviceA call sync and wait
1429      * @tc.expected: step2. sync should return OK.
1430      */
1431     std::map<std::string, DBStatus> result;
1432     status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result);
1433     ASSERT_TRUE(status == OK);
1434 
1435     /**
1436      * @tc.expected: step2. onComplete should be called, DeviceB have {k1,v1}, send request message 3 times
1437      */
1438     ASSERT_TRUE(result.size() == devices.size());
1439     for (const auto &pair : result) {
1440         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1441         EXPECT_TRUE(pair.second == OK);
1442     }
1443     VirtualDataItem item;
1444     g_deviceB->GetData(key, item);
1445     EXPECT_TRUE(item.value == value);
1446 
1447     EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1448 
1449     /**
1450      * @tc.steps: step3. reset sendRequestCount to 0, deviceA call sync and wait again without any change in db
1451      * @tc.expected: step3. sync should return OK, and sendRequestCount should be 1, because this merge can not
1452      * be skipped
1453      */
1454     sendRequestCount = 0;
1455     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1456     ASSERT_TRUE(status == OK);
1457     EXPECT_EQ(sendRequestCount, 1);
1458 }
1459 
1460 /**
1461  * @tc.name: PushSyncMergeCheck001
1462  * @tc.desc: Test push sync task merge, task can not be merged when the two sync task is not in the queue
1463  * at the same time.
1464  * @tc.type: FUNC
1465  * @tc.require:
1466  * @tc.author: zhangshijie
1467  */
1468 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck001, TestSize.Level1)
1469 {
1470     TestDifferentSyncMode(SYNC_MODE_PUSH_ONLY);
1471 }
1472 
1473 /**
1474  * @tc.name: PushSyncMergeCheck002
1475  * @tc.desc: Test push_pull sync task merge, task can not be merged when the two sync task is not in the queue
1476  * at the same time.
1477  * @tc.type: FUNC
1478  * @tc.require:
1479  * @tc.author: zhangshijie
1480  */
1481 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck002, TestSize.Level1)
1482 {
1483     TestDifferentSyncMode(SYNC_MODE_PUSH_PULL);
1484 }
1485 
PrepareForSyncMergeTest(std::vector<std::string> & devices,int & sendRequestCount)1486 void PrepareForSyncMergeTest(std::vector<std::string> &devices, int &sendRequestCount)
1487 {
1488     /**
1489      * @tc.steps: step1. deviceA put {k1, v1}
1490      */
1491     Key key = {'1'};
1492     Value value = {'1'};
1493     DBStatus status = g_kvDelegatePtr->Put(key, value);
1494     ASSERT_TRUE(status == OK);
1495 
1496     RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1497 
1498     /**
1499      * @tc.steps: step2. deviceA call sync and don't wait
1500      * @tc.expected: step2. sync should return OK.
1501      */
1502     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
1503         [&sendRequestCount, devices, key, value](const std::map<std::string, DBStatus>& statusMap) {
1504         ASSERT_TRUE(statusMap.size() == devices.size());
1505         for (const auto &pair : statusMap) {
1506             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1507             EXPECT_TRUE(pair.second == OK);
1508         }
1509         VirtualDataItem item;
1510         g_deviceB->GetData(key, item);
1511         EXPECT_EQ(item.value, value);
1512         EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1513 
1514         // reset sendRequestCount to 0
1515         sendRequestCount = 0;
1516     });
1517     ASSERT_TRUE(status == OK);
1518 }
1519 
1520 /**
1521  * @tc.name: PushSyncMergeCheck003
1522  * @tc.desc: Test push sync task merge, task can not be merged when there is change in db since last push sync
1523  * @tc.type: FUNC
1524  * @tc.require:
1525  * @tc.author: zhangshijie
1526  */
1527 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck003, TestSize.Level3)
1528 {
1529     DBStatus status = OK;
1530     std::vector<std::string> devices;
1531     devices.push_back(g_deviceB->GetDeviceId());
1532 
1533     int sendRequestCount = 0;
1534     PrepareForSyncMergeTest(devices, sendRequestCount);
1535 
1536     /**
1537      * @tc.steps: step3. deviceA call sync and don't wait
1538      * @tc.expected: step3. sync should return OK.
1539      */
1540     Key key = {'1'};
1541     Value value = {'2'};
1542     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672002(const std::map<std::string, DBStatus>& statusMap) 1543         [&sendRequestCount, devices, key, value, this](const std::map<std::string, DBStatus>& statusMap) {
1544         /**
1545          * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1546          * skipped, but it is no need to do time sync and ability sync, only need to do data sync
1547          */
1548         ASSERT_TRUE(statusMap.size() == devices.size());
1549         for (const auto &pair : statusMap) {
1550             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1551             EXPECT_TRUE(pair.second == OK);
1552         }
1553         VirtualDataItem item;
1554         g_deviceB->GetData(key, item);
1555         EXPECT_EQ(item.value, value);
1556     });
1557     ASSERT_TRUE(status == OK);
1558 
1559     /**
1560      * @tc.steps: step4. deviceA put {k1, v2}
1561      */
1562     while (sendRequestCount < TWO_CNT) {
1563         std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1564     }
1565     status = g_kvDelegatePtr->Put(key, value);
1566     ASSERT_TRUE(status == OK);
1567     // wait for the second sync task finish
1568     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1569     EXPECT_EQ(sendRequestCount, 1);
1570 }
1571 
1572 /**
1573  * @tc.name: PushSyncMergeCheck004
1574  * @tc.desc: Test push sync task merge, task can be merged when there is no change in db since last push sync
1575  * @tc.type: FUNC
1576  * @tc.require:
1577  * @tc.author: zhangshijie
1578  */
1579 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck004, TestSize.Level3)
1580 {
1581     DBStatus status = OK;
1582     std::vector<std::string> devices;
1583     devices.push_back(g_deviceB->GetDeviceId());
1584 
1585     int sendRequestCount = 0;
1586     PrepareForSyncMergeTest(devices, sendRequestCount);
1587 
1588     /**
1589      * @tc.steps: step3. deviceA call sync and don't wait
1590      * @tc.expected: step3. sync should return OK.
1591      */
1592     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672102(const std::map<std::string, DBStatus>& statusMap) 1593         [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1594         /**
1595          * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can  be
1596          * skipped
1597          */
1598         ASSERT_TRUE(statusMap.size() == devices.size());
1599         for (const auto &pair : statusMap) {
1600             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1601             EXPECT_TRUE(pair.second == OK);
1602         }
1603     });
1604     ASSERT_TRUE(status == OK);
1605     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1606     EXPECT_EQ(sendRequestCount, 0);
1607 }
1608 
RegOnDispatchWithInvalidMsgAndCnt(int & sendRequestCount,int sleepMs,bool & invalid)1609 void RegOnDispatchWithInvalidMsgAndCnt(int &sendRequestCount, int sleepMs, bool &invalid)
1610 {
1611     g_communicatorAggregator->RegOnDispatch([&sendRequestCount, sleepMs, &invalid](
1612         const std::string &dev, Message *inMsg) {
1613         if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
1614             inMsg->SetMessageType(TYPE_INVALID);
1615             inMsg->SetMessageId(INVALID_MESSAGE_ID);
1616             sendRequestCount++;
1617             invalid = true;
1618             LOGW("[Dispatch]invalid THIS MSG, sendRequestCount = %d", sendRequestCount);
1619             std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1620         }
1621     });
1622 }
1623 
1624 /**
1625  * @tc.name: PushSyncMergeCheck005
1626  * @tc.desc: Test push sync task merge, task cannot be merged when the last push sync is failed
1627  * @tc.type: FUNC
1628  * @tc.require:
1629  * @tc.author: zhangshijie
1630  */
1631 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck005, TestSize.Level3)
1632 {
1633     DBStatus status = OK;
1634     std::vector<std::string> devices;
1635     devices.push_back(g_deviceB->GetDeviceId());
1636 
1637     /**
1638      * @tc.steps: step1. deviceA put {k1, v1}
1639      */
1640     Key key = {'1'};
1641     Value value = {'1'};
1642     status = g_kvDelegatePtr->Put(key, value);
1643     ASSERT_TRUE(status == OK);
1644 
1645     int sendRequestCount = 0;
1646     bool invalid = false;
1647     RegOnDispatchWithInvalidMsgAndCnt(sendRequestCount, SLEEP_MILLISECONDS, invalid);
1648 
1649     /**
1650      * @tc.steps: step2. deviceA call sync and don't wait
1651      * @tc.expected: step2. sync should return TIME_OUT.
1652      */
1653     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672302(const std::map<std::string, DBStatus>& statusMap) 1654         [&sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1655         ASSERT_TRUE(statusMap.size() == devices.size());
1656         for (const auto &deviceId : devices) {
1657             ASSERT_EQ(statusMap.at(deviceId), TIME_OUT);
1658         }
1659     });
1660     EXPECT_TRUE(status == OK);
1661 
1662     /**
1663      * @tc.steps: step3. deviceA call sync and don't wait
1664      * @tc.expected: step3. sync should return OK.
1665      */
1666     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672402(const std::map<std::string, DBStatus>& statusMap) 1667         [key, value, &sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1668         /**
1669          * @tc.expected: when the second sync task return, sendRequestCount should be 3, because this merge can not be
1670          * skipped, deviceB should have {k1, v1}.
1671          */
1672         ASSERT_TRUE(statusMap.size() == devices.size());
1673         for (const auto &pair : statusMap) {
1674             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1675             EXPECT_EQ(pair.second, OK);
1676         }
1677         VirtualDataItem item;
1678         g_deviceB->GetData(key, item);
1679         EXPECT_EQ(item.value, value);
1680     });
1681     ASSERT_TRUE(status == OK);
1682     while (sendRequestCount < 1) {
1683         std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1684     }
1685     sendRequestCount = 0;
1686     RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1687 
1688     // wait for the second sync task finish
1689     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1690     EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1691 }
1692 
PrePareForQuerySyncMergeTest(bool isQuerySync,std::vector<std::string> & devices,Key & key,Value & value,int & sendRequestCount)1693 void PrePareForQuerySyncMergeTest(bool isQuerySync, std::vector<std::string> &devices,
1694     Key &key, Value &value, int &sendRequestCount)
1695 {
1696     DBStatus status = OK;
1697     /**
1698      * @tc.steps: step1. deviceA put {k1, v1}...{k10, v10}
1699      */
1700     Query query = Query::Select().PrefixKey(key);
1701     const int dataSize = 10;
1702     for (int i = 0; i < dataSize; i++) {
1703         key.push_back(i);
1704         value.push_back(i);
1705         status = g_kvDelegatePtr->Put(key, value);
1706         ASSERT_TRUE(status == OK);
1707         key.pop_back();
1708         value.pop_back();
1709     }
1710 
1711     RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1712     /**
1713      * @tc.steps: step2. deviceA call query sync and don't wait
1714      * @tc.expected: step2. sync should return OK.
1715      */
1716     auto completeCallBack = [&sendRequestCount, &key, &value, dataSize, devices]
1717         (const std::map<std::string, DBStatus>& statusMap) {
1718         ASSERT_TRUE(statusMap.size() == devices.size());
1719         for (const auto &pair : statusMap) {
1720             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1721             EXPECT_EQ(pair.second, OK);
1722         }
1723         // when first sync finish, DeviceB have {k1,v1}, {k3,v3}, {k5,v5} .. send request message 3 times
1724         VirtualDataItem item;
1725         for (int i = 0; i < dataSize; i++) {
1726             key.push_back(i);
1727             value.push_back(i);
1728             g_deviceB->GetData(key, item);
1729             EXPECT_EQ(item.value, value);
1730             key.pop_back();
1731             value.pop_back();
1732         }
1733         EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1734         // reset sendRequestCount to 0
1735         sendRequestCount = 0;
1736     };
1737     if (isQuerySync) {
1738         status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack, query, false);
1739     } else {
1740         status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack);
1741     }
1742     ASSERT_TRUE(status == OK);
1743 }
1744 
1745 /**
1746  * @tc.name: QuerySyncMergeCheck001
1747  * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last query sync
1748  * @tc.type: FUNC
1749  * @tc.require:
1750  * @tc.author: zhangshijie
1751  */
1752 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck001, TestSize.Level3)
1753 {
1754     std::vector<std::string> devices;
1755     int sendRequestCount = 0;
1756     devices.push_back(g_deviceB->GetDeviceId());
1757 
1758     Key key {'1'};
1759     Value value {'1'};
1760     Query query = Query::Select().PrefixKey(key);
1761     PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1762 
1763     /**
1764      * @tc.steps: step3. deviceA call query sync and don't wait
1765      * @tc.expected: step3. sync should return OK.
1766      */
1767     DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672602(const std::map<std::string, DBStatus>& statusMap) 1768         [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1769         /**
1770          * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can be
1771          * skipped because there is no change in db since last query sync
1772          */
1773         ASSERT_TRUE(statusMap.size() == devices.size());
1774         for (const auto &pair : statusMap) {
1775             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1776             EXPECT_TRUE(pair.second == OK);
1777         }
1778     }, query, false);
1779     ASSERT_TRUE(status == OK);
1780     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1781     EXPECT_EQ(sendRequestCount, 0);
1782 }
1783 
1784 /**
1785  * @tc.name: QuerySyncMergeCheck002
1786  * @tc.desc: Test query push sync task merge, task can not be merged when there is change in db since last sync
1787  * @tc.type: FUNC
1788  * @tc.require:
1789  * @tc.author: zhangshijie
1790  */
1791 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck002, TestSize.Level3)
1792 {
1793     std::vector<std::string> devices;
1794     int sendRequestCount = 0;
1795     devices.push_back(g_deviceB->GetDeviceId());
1796 
1797     Key key {'1'};
1798     Value value {'1'};
1799     Query query = Query::Select().PrefixKey(key);
1800     PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1801 
1802     /**
1803      * @tc.steps: step3. deviceA call query sync and don't wait
1804      * @tc.expected: step3. sync should return OK.
1805      */
1806     Value value3{'3'};
1807     DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672702(const std::map<std::string, DBStatus>& statusMap) 1808         [&sendRequestCount, devices, key, value3, this](const std::map<std::string, DBStatus>& statusMap) {
1809         /**
1810          * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1811          * skipped when there is change in db since last query sync, deviceB have {k1, v1'}
1812          */
1813         ASSERT_TRUE(statusMap.size() == devices.size());
1814         for (const auto &pair : statusMap) {
1815             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1816             EXPECT_TRUE(pair.second == OK);
1817         }
1818         VirtualDataItem item;
1819         g_deviceB->GetData(key, item);
1820         EXPECT_TRUE(item.value == value3);
1821         EXPECT_EQ(sendRequestCount, 1);
1822         }, query, false);
1823     ASSERT_TRUE(status == OK);
1824 
1825     /**
1826      * @tc.steps: step4. deviceA put {k1, v1'}
1827      * @tc.steps: step4. reset sendRequestCount to 0, deviceA call sync and wait
1828      * @tc.expected: step4. sync should return OK, and sendRequestCount should be 1, because this merge can not
1829      * be skipped
1830      */
1831     while (sendRequestCount < TWO_CNT) {
1832         std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1833     }
1834     g_kvDelegatePtr->Put(key, value3);
1835     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1836 }
1837 
1838 /**
1839  * @tc.name: QuerySyncMergeCheck003
1840  * @tc.desc: Test query push sync task merge, task can not be merged when then query id is different
1841  * @tc.type: FUNC
1842  * @tc.require:
1843  * @tc.author: zhangshijie
1844  */
1845 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck003, TestSize.Level3)
1846 {
1847     std::vector<std::string> devices;
1848     int sendRequestCount = 0;
1849     devices.push_back(g_deviceB->GetDeviceId());
1850 
1851     Key key {'1'};
1852     Value value {'1'};
1853     PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1854 
1855     /**
1856      * @tc.steps: step3.  deviceA call another query sync
1857      * @tc.expected: step3. sync should return OK.
1858      */
1859     Key key2 = {'2'};
1860     Value value2 = {'2'};
1861     DBStatus status = g_kvDelegatePtr->Put(key2, value2);
1862     ASSERT_TRUE(status == OK);
1863     Query query2 = Query::Select().PrefixKey(key2);
1864     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672802(const std::map<std::string, DBStatus>& statusMap) 1865         [&sendRequestCount, key2, value2, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1866         /**
1867          * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1868          * skipped, deviceB have {k2,v2}
1869          */
1870         ASSERT_TRUE(statusMap.size() == devices.size());
1871         for (const auto &pair : statusMap) {
1872             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1873             EXPECT_TRUE(pair.second == OK);
1874         }
1875         VirtualDataItem item;
1876         g_deviceB->GetData(key2, item);
1877         EXPECT_TRUE(item.value == value2);
1878         EXPECT_EQ(sendRequestCount, 1);
1879         }, query2, false);
1880     ASSERT_TRUE(status == OK);
1881     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1882 }
1883 
1884 /**
1885 * @tc.name: QuerySyncMergeCheck004
1886 * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last push sync
1887 * @tc.type: FUNC
1888 * @tc.require:
1889 * @tc.author: zhangshijie
1890 */
1891 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck004, TestSize.Level3)
1892 {
1893     DBStatus status = OK;
1894     std::vector<std::string> devices;
1895     devices.push_back(g_deviceB->GetDeviceId());
1896 
1897     Key key {'1'};
1898     Value value {'1'};
1899     int sendRequestCount = 0;
1900     PrePareForQuerySyncMergeTest(false, devices, key, value, sendRequestCount);
1901 
1902     /**
1903      * @tc.steps: step3. deviceA call query sync without any change in db
1904      * @tc.expected: step3. sync should return OK, and sendRequestCount should be 0, because this merge can be skipped
1905      */
1906     Query query = Query::Select().PrefixKey(key);
1907     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672902(const std::map<std::string, DBStatus>& statusMap) 1908         [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1909             /**
1910              * @tc.expected step3: when the second sync task return, sendRequestCount should be 0, because this merge
1911              * can be skipped because there is no change in db since last push sync
1912              */
1913             ASSERT_TRUE(statusMap.size() == devices.size());
1914             for (const auto &pair : statusMap) {
1915                 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1916                 EXPECT_TRUE(pair.second == OK);
1917             }
1918         }, query, false);
1919     ASSERT_TRUE(status == OK);
1920     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1921     EXPECT_EQ(sendRequestCount, 0);
1922 }
1923 
1924 /**
1925   * @tc.name: GetDataNotify001
1926   * @tc.desc: Test GetDataNotify function, delay < 30s should sync ok, > 36 should timeout
1927   * @tc.type: FUNC
1928   * @tc.require:
1929   * @tc.author: zhangqiquan
1930   */
1931 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify001, TestSize.Level3)
1932 {
1933     ASSERT_NE(g_kvDelegatePtr, nullptr);
1934     DBStatus status = OK;
1935     std::vector<std::string> devices;
1936     devices.push_back(g_deviceB->GetDeviceId());
1937     const std::string DEVICE_A = "real_device";
1938     /**
1939      * @tc.steps: step1. deviceB set get data delay 40s
1940      */
1941     g_deviceB->DelayGetSyncData(WAIT_40_SECONDS);
1942     g_communicatorAggregator->SetTimeout(DEVICE_A, TIMEOUT_6_SECONDS);
1943 
1944     /**
1945      * @tc.steps: step2. deviceA call sync and wait
1946      * @tc.expected: step2. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
1947      */
1948     std::map<std::string, DBStatus> result;
1949     std::map<std::string, int> virtualRes;
1950     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1951     EXPECT_EQ(status, OK);
1952     EXPECT_EQ(result.size(), devices.size());
1953     EXPECT_EQ(result[DEVICE_B], TIME_OUT);
1954     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1955     Query query = Query::Select();
__anonde2bd9672a02(std::map<std::string, int> resMap) 1956     g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1957         virtualRes = std::move(resMap);
1958     }, true);
1959     EXPECT_EQ(virtualRes.size(), devices.size());
1960     EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_TIMEOUT));
1961     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1962 
1963     /**
1964      * @tc.steps: step3. deviceB set get data delay 30s
1965      */
1966     g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1967     /**
1968      * @tc.steps: step4. deviceA call sync and wait
1969      * @tc.expected: step4. sync should return OK. onComplete should be called, deviceB sync OK.
1970      */
1971     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1972     EXPECT_EQ(status, OK);
1973     EXPECT_EQ(result.size(), devices.size());
1974     EXPECT_EQ(result[DEVICE_B], OK);
1975     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
__anonde2bd9672b02(std::map<std::string, int> resMap) 1976     g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1977         virtualRes = std::move(resMap);
1978     }, true);
1979     EXPECT_EQ(virtualRes.size(), devices.size());
1980     EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_FINISHED_ALL));
1981     g_deviceB->DelayGetSyncData(0);
1982 }
1983 
1984 /**
1985   * @tc.name: GetDataNotify002
1986   * @tc.desc: Test GetDataNotify function, two device sync each other at same time
1987   * @tc.type: FUNC
1988   * @tc.require:
1989   * @tc.author: zhangqiquan
1990   */
1991 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify002, TestSize.Level3)
1992 {
1993     ASSERT_NE(g_kvDelegatePtr, nullptr);
1994     DBStatus status = OK;
1995     std::vector<std::string> devices;
1996     devices.push_back(g_deviceB->GetDeviceId());
1997     const std::string DEVICE_A = "real_device";
1998 
1999     /**
2000      * @tc.steps: step1. deviceA sync first to finish time sync and ability sync
2001      */
2002     std::map<std::string, DBStatus> result;
2003     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
2004     EXPECT_EQ(status, OK);
2005     EXPECT_EQ(result.size(), devices.size());
2006     EXPECT_EQ(result[DEVICE_B], OK);
2007     /**
2008      * @tc.steps: step2. deviceB set get data delay 30s
2009      */
2010     g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
2011 
2012     /**
2013      * @tc.steps: step3. deviceB call sync and wait
2014      */
__anonde2bd9672c02() 2015     std::thread asyncThread([]() {
2016         std::map<std::string, int> virtualRes;
2017         Query query = Query::Select();
2018         g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
2019                 virtualRes = std::move(resMap);
2020             }, true);
2021     });
2022 
2023     /**
2024      * @tc.steps: step4. deviceA call sync and wait
2025      * @tc.expected: step4. sync should return OK. because notify timer trigger (30s - 1s)/2s => 15times
2026      */
2027     std::this_thread::sleep_for(std::chrono::seconds(1));
2028     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
2029     EXPECT_EQ(status, OK);
2030     EXPECT_EQ(result.size(), devices.size());
2031     EXPECT_EQ(result[DEVICE_B], OK);
2032     asyncThread.join();
2033     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
2034 }
2035 
2036 /**
2037  * @tc.name: DelaySync001
2038  * @tc.desc: Test delay first packet will not effect data conflict
2039  * @tc.type: FUNC
2040  * @tc.require:
2041  * @tc.author: zqq
2042  */
2043 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, DelaySync001, TestSize.Level3)
2044 {
2045     // B put (k, b) after A put (k, a)
2046     Key key = {'k'};
2047     Value aValue = {'a'};
2048     g_kvDelegatePtr->Put(key, aValue);
2049     std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for data conflict
2050     Timestamp currentTime = TimeHelper::GetSysCurrentTime() + TimeHelper::BASE_OFFSET;
2051     Value bValue = {'b'};
2052     EXPECT_EQ(g_deviceB->PutData(key, bValue, currentTime, 0), E_OK);
2053 
2054     // delay time sync message, delay time/2 should greater than put sleep time
2055     g_communicatorAggregator->SetTimeout(DEVICE_B, DBConstant::MAX_TIMEOUT);
2056     g_communicatorAggregator->SetTimeout("real_device", DBConstant::MAX_TIMEOUT);
__anonde2bd9672e02(const std::string &dstTarget, const Message *msg) 2057     g_communicatorAggregator->RegBeforeDispatch([](const std::string &dstTarget, const Message *msg) {
2058         if (dstTarget == DEVICE_B && msg->GetMessageId() == MessageId::TIME_SYNC_MESSAGE) {
2059             std::this_thread::sleep_for(std::chrono::seconds(3)); // sleep for 3s
2060         }
2061     });
2062 
2063     // A call sync and (k, b) in A
2064     std::vector<std::string> devices;
2065     devices.push_back(g_deviceB->GetDeviceId());
2066     std::map<std::string, DBStatus> result;
2067     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
2068     EXPECT_EQ(status, OK);
2069     EXPECT_EQ(result.size(), devices.size());
2070     EXPECT_EQ(result[DEVICE_B], OK);
2071 
2072     Value actualValue;
2073     g_kvDelegatePtr->Get(key, actualValue);
2074     EXPECT_EQ(actualValue, bValue);
2075     g_communicatorAggregator->RegBeforeDispatch(nullptr);
2076 }
2077 
2078 /**
2079  * @tc.name: KVAbilitySyncOpt001
2080  * @tc.desc: check ability sync 2 packet
2081  * @tc.type: FUNC
2082  * @tc.require:
2083  * @tc.author: zhangqiquan
2084  */
2085 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVAbilitySyncOpt001, TestSize.Level0)
2086 {
2087     /**
2088      * @tc.steps: step1. record packet
2089      * @tc.expected: step1. sync should failed in source.
2090      */
2091     std::atomic<int> messageCount = 0;
__anonde2bd9672f02(const std::string &dev, Message *msg) 2092     g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &dev, Message *msg) {
2093         if (msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
2094             return;
2095         }
2096         messageCount++;
2097         EXPECT_GE(g_kvDelegatePtr->GetTaskCount(), 1);
2098     });
2099     /**
2100      * @tc.steps: step2. deviceA call sync and wait
2101      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
2102      */
2103     DBStatus status = OK;
2104     std::vector<std::string> devices;
2105     devices.push_back(g_deviceB->GetDeviceId());
2106     std::map<std::string, DBStatus> result;
2107     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2108     EXPECT_EQ(status, OK);
2109     EXPECT_EQ(messageCount, 2); // 2 ability sync
2110     for (const auto &pair : result) {
2111         EXPECT_EQ(pair.second, OK);
2112     }
2113 }
2114 
2115 /**
2116  * @tc.name: KVAbilitySyncOpt002
2117  * @tc.desc: check get task count while conn is nullptr.
2118  * @tc.type: FUNC
2119  * @tc.require:
2120  * @tc.author: caihaoting
2121  */
2122 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVAbilitySyncOpt002, TestSize.Level0)
2123 {
2124     /**
2125      * @tc.steps: step1. record packet while conn is nullptr.
2126      * @tc.expected: step1. sync should failed in source and get task count return DB_ERROR.
2127      */
2128     auto kvStoreImpl = static_cast<KvStoreNbDelegateImpl *>(g_kvDelegatePtr);
2129     EXPECT_EQ(kvStoreImpl->Close(), OK);
2130     std::atomic<int> messageCount = 0;
__anonde2bd9673002(const std::string &dev, Message *msg) 2131     g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &dev, Message *msg) {
2132         if (msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
2133             return;
2134         }
2135         messageCount++;
2136         EXPECT_EQ(g_kvDelegatePtr->GetTaskCount(), DB_ERROR);
2137     });
2138 }
2139 
2140 /**
2141  * @tc.name: KVSyncOpt001
2142  * @tc.desc: check time sync and ability sync once
2143  * @tc.type: FUNC
2144  * @tc.require:
2145  * @tc.author: zhangqiquan
2146  */
2147 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt001, TestSize.Level0)
2148 {
2149     /**
2150      * @tc.steps: step1. record packet which send to B
2151      */
2152     std::atomic<int> messageCount = 0;
2153     RegOnDispatchWithoutDataPacket(messageCount);
2154     /**
2155      * @tc.steps: step2. deviceA call sync and wait
2156      * @tc.expected: step2. sync should return OK.
2157      */
2158     std::vector<std::string> devices;
2159     devices.push_back(g_deviceB->GetDeviceId());
2160     Sync(devices, OK);
2161     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2162     /**
2163      * @tc.steps: step3. reopen kv store
2164      * @tc.expected: step3. reopen OK.
2165      */
2166     ReOpenDB();
2167     /**
2168      * @tc.steps: step4. reopen kv store and sync again
2169      * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2170      */
2171     messageCount = 0;
2172     Sync(devices, OK);
2173     EXPECT_EQ(messageCount, 0);
2174     g_communicatorAggregator->RegOnDispatch(nullptr);
2175 }
2176 
2177 /**
2178  * @tc.name: KVSyncOpt002
2179  * @tc.desc: check device time sync once
2180  * @tc.type: FUNC
2181  * @tc.require:
2182  * @tc.author: zhangqiquan
2183  */
2184 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt002, TestSize.Level0)
2185 {
2186 /**
2187      * @tc.steps: step1. record packet which send to B
2188      */
2189     std::atomic<int> messageCount = 0;
2190     RegOnDispatchWithoutDataPacket(messageCount);
2191     /**
2192      * @tc.steps: step2. deviceA call sync and wait
2193      * @tc.expected: step2. sync should return OK.
2194      */
2195     std::vector<std::string> devices;
2196     devices.push_back(g_deviceB->GetDeviceId());
2197     Sync(devices, OK);
2198     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2199     // close kv store avoid packet dispatch error
2200     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
2201     g_kvDelegatePtr = nullptr;
2202     ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID), OK);
2203     EXPECT_TRUE(RuntimeContext::GetInstance()->IsTimeTickMonitorValid());
2204     /**
2205      * @tc.steps: step3. open new kv store
2206      * @tc.expected: step3. open OK.
2207      */
2208     KvStoreNbDelegate::Option option;
2209     option.secOption.securityLabel = SecurityLabel::S3;
2210     option.secOption.securityFlag = SecurityFlag::SECE;
2211     KvStoreNbDelegate *delegate2 = nullptr;
__anonde2bd9673102(DBStatus status, KvStoreNbDelegate *delegate) 2212     g_mgr.GetKvStore(STORE_ID_2, option, [&delegate2](DBStatus status, KvStoreNbDelegate *delegate) {
2213         delegate2 = delegate;
2214         EXPECT_EQ(status, OK);
2215     });
2216     /**
2217      * @tc.steps: step4. sync again
2218      * @tc.expected: step4. sync success, only ability sync packet.
2219      */
2220     messageCount = 0;
2221     Sync(delegate2, devices, OK);
2222     EXPECT_EQ(messageCount, 1); // 1 contain ability sync packet
2223     EXPECT_EQ(g_mgr.CloseKvStore(delegate2), OK);
2224     EXPECT_EQ(g_mgr.DeleteKvStore(STORE_ID_2), OK);
2225     g_communicatorAggregator->RegOnDispatch(nullptr);
2226 }
2227 
2228 /**
2229  * @tc.name: KVSyncOpt003
2230  * @tc.desc: check time sync and ability sync once
2231  * @tc.type: FUNC
2232  * @tc.require:
2233  * @tc.author: zhangqiquan
2234  */
2235 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt003, TestSize.Level0)
2236 {
2237     /**
2238      * @tc.steps: step1. record packet which send to B
2239      */
2240     std::atomic<int> messageCount = 0;
2241     RegOnDispatchWithoutDataPacket(messageCount);
2242     /**
2243      * @tc.steps: step2. deviceA call sync and wait
2244      * @tc.expected: step2. sync should return OK.
2245      */
2246     std::vector<std::string> devices;
2247     devices.push_back(g_deviceB->GetDeviceId());
2248     Sync(devices, OK);
2249     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2250     /**
2251      * @tc.steps: step3. reopen kv store
2252      * @tc.expected: step3. reopen OK.
2253      */
2254     ReOpenDB();
2255     /**
2256      * @tc.steps: step4. reopen kv store and sync again
2257      * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2258      */
2259     messageCount = 0;
2260     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2261     EXPECT_EQ(messageCount, 0);
2262     g_communicatorAggregator->RegOnDispatch(nullptr);
2263 }
2264 
2265 /**
2266  * @tc.name: KVSyncOpt004
2267  * @tc.desc: check sync in keys after reopen db
2268  * @tc.type: FUNC
2269  * @tc.require:
2270  * @tc.author: zhangqiquan
2271  */
2272 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt004, TestSize.Level0)
2273 {
2274     /**
2275      * @tc.steps: step1. deviceA call sync and wait
2276      * @tc.expected: step1. sync should return OK.
2277      */
2278     std::vector<std::string> devices;
2279     devices.push_back(g_deviceB->GetDeviceId());
2280     Sync(devices, OK);
2281     /**
2282      * @tc.steps: step2. reopen kv store
2283      * @tc.expected: step2. reopen OK.
2284      */
2285     ReOpenDB();
2286     /**
2287      * @tc.steps: step3. sync with in keys
2288      * @tc.expected: step3. sync OK.
2289      */
2290     std::map<std::string, DBStatus> result;
2291     std::set<Key> condition;
2292     condition.insert({'k'});
2293     Query query = Query::Select().InKeys(condition);
2294     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
2295     EXPECT_EQ(status, OK);
2296     for (const auto &deviceId : devices) {
2297         EXPECT_EQ(result[deviceId], OK);
2298     }
2299 }
2300 
2301 /**
2302  * @tc.name: KVSyncOpt005
2303  * @tc.desc: check record ability finish after receive ability sync
2304  * @tc.type: FUNC
2305  * @tc.require:
2306  * @tc.author: zhangqiquan
2307  */
2308 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt005, TestSize.Level0)
2309 {
2310     /**
2311      * @tc.steps: step1. record packet which send to B
2312      */
2313     std::atomic<int> messageCount = 0;
2314     RegOnDispatchWithoutDataPacket(messageCount, true);
2315     /**
2316      * @tc.steps: step2. deviceB call sync and wait
2317      * @tc.expected: step2. sync should return OK.
2318      */
2319     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2320     EXPECT_EQ(messageCount, 2); // DEV_A send negotiation 2 ack packet.
2321     /**
2322      * @tc.steps: step3. reopen kv store
2323      * @tc.expected: step3. reopen OK.
2324      */
2325     ReOpenDB();
2326     /**
2327      * @tc.steps: step4. reopen kv store and sync again
2328      * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2329      */
2330     messageCount = 0;
2331     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2332     EXPECT_EQ(messageCount, 0);
2333     g_communicatorAggregator->RegOnDispatch(nullptr);
2334 }
2335 
2336 /**
2337  * @tc.name: KVSyncOpt006
2338  * @tc.desc: check time sync and ability sync once after rebuild
2339  * @tc.type: FUNC
2340  * @tc.require:
2341  * @tc.author: zhangqiquan
2342  */
2343 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt006, TestSize.Level0)
2344 {
2345     /**
2346      * @tc.steps: step1. record packet which send to B
2347      */
2348     std::atomic<int> messageCount = 0;
2349     RegOnDispatchWithoutDataPacket(messageCount, true);
2350     /**
2351      * @tc.steps: step2. deviceA call sync and wait
2352      * @tc.expected: step2. sync should return OK.
2353      */
2354     std::vector<std::string> devices;
2355     devices.push_back(g_deviceB->GetDeviceId());
2356     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2357     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2358     /**
2359      * @tc.steps: step3. rebuild kv store
2360      * @tc.expected: step3. rebuild OK.
2361      */
2362     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
2363     g_kvDelegatePtr = nullptr;
2364     g_mgr.DeleteKvStore(STORE_ID);
2365     KvStoreNbDelegate::Option option;
2366     option.secOption.securityLabel = SecurityLabel::S3;
2367     option.secOption.securityFlag = SecurityFlag::SECE;
2368     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
2369     ASSERT_TRUE(g_kvDelegateStatus == OK);
2370     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2371     /**
2372      * @tc.steps: step4. rebuild kv store and sync again
2373      * @tc.expected: step4. rebuild OK and sync success, re ability sync.
2374      */
2375     messageCount = 0;
2376     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2377     EXPECT_EQ(messageCount, 1);
2378     g_communicatorAggregator->RegOnDispatch(nullptr);
2379 }
2380 
2381 /**
2382  * @tc.name: KVSyncOpt007
2383  * @tc.desc: check re ability sync after import
2384  * @tc.type: FUNC
2385  * @tc.require:
2386  * @tc.author: zhangqiquan
2387  */
2388 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt007, TestSize.Level0)
2389 {
2390     /**
2391      * @tc.steps: step1. record packet which send to B
2392      */
2393     std::atomic<int> messageCount = 0;
2394     RegOnDispatchWithoutDataPacket(messageCount, true);
2395     /**
2396      * @tc.steps: step2. deviceB call sync and wait
2397      * @tc.expected: step2. sync should return OK.
2398      */
2399     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2400     EXPECT_EQ(messageCount, 2); // DEV_A send negotiation 2 ack packet.
2401     /**
2402      * @tc.steps: step3. export and import
2403      * @tc.expected: step3. export and import OK.
2404      */
2405     std::string singleExportFileName = g_testDir + "/KVSyncOpt007.$$";
2406     CipherPassword passwd;
2407     EXPECT_EQ(g_kvDelegatePtr->Export(singleExportFileName, passwd), OK);
2408     EXPECT_EQ(g_kvDelegatePtr->Import(singleExportFileName, passwd), OK);
2409     /**
2410      * @tc.steps: step4. reopen kv store and sync again
2411      * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2412      */
2413     messageCount = 0;
2414     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2415     EXPECT_EQ(messageCount, 1); // DEV_A send negotiation 1 ack packet.
2416     g_communicatorAggregator->RegOnDispatch(nullptr);
2417 }
2418 
2419 /**
2420  * @tc.name: KVSyncOpt008
2421  * @tc.desc: check rebuild open store with NOT_SET.
2422  * @tc.type: FUNC
2423  * @tc.require:
2424  * @tc.author: tankaisheng
2425  */
2426 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt008, TestSize.Level0)
2427 {
2428     /**
2429      * @tc.steps: step1. record packet which send to B
2430      */
2431     std::atomic<int> messageCount = 0;
2432     RegOnDispatchWithoutDataPacket(messageCount, true);
2433     /**
2434      * @tc.steps: step2. deviceA call sync and wait
2435      * @tc.expected: step2. sync should return OK.
2436      */
2437     std::vector<std::string> devices;
2438     devices.push_back(g_deviceB->GetDeviceId());
2439     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2440     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2441     /**
2442      * @tc.steps: step3. rebuild kv store
2443      * @tc.expected: step3. rebuild failed.
2444      */
2445     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
2446     g_kvDelegatePtr = nullptr;
2447     g_mgr.DeleteKvStore(STORE_ID);
2448     KvStoreNbDelegate::Option option;
2449     option.secOption.securityLabel = SecurityLabel::NOT_SET;
2450     option.secOption.securityFlag = SecurityFlag::SECE;
2451     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
2452     ASSERT_TRUE(g_kvDelegateStatus == DBStatus::INVALID_ARGS);
2453     ASSERT_TRUE(g_kvDelegatePtr == nullptr);
2454 }
2455 
2456 /**
2457  * @tc.name: KVTimeChange001
2458  * @tc.desc: check time sync and ability sync once
2459  * @tc.type: FUNC
2460  * @tc.require:
2461  * @tc.author: zhangqiquan
2462  */
2463 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVTimeChange001, TestSize.Level0)
2464 {
2465     /**
2466      * @tc.steps: step1. record packet which send to B
2467      */
2468     std::atomic<int> messageCount = 0;
2469     RegOnDispatchWithoutDataPacket(messageCount);
2470     /**
2471      * @tc.steps: step2. deviceA call sync and wait
2472      * @tc.expected: step2. sync should return OK.
2473      */
2474     std::vector<std::string> devices;
2475     devices.push_back(g_deviceB->GetDeviceId());
2476     Sync(devices, OK);
2477     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2478     /**
2479      * @tc.steps: step3. sync again
2480      * @tc.expected: step3. sync success, no negotiation packet.
2481      */
2482     messageCount = 0;
2483     Sync(devices, OK);
2484     EXPECT_EQ(messageCount, 0);
2485     /**
2486      * @tc.steps: step4. modify time offset and sync again
2487      * @tc.expected: step4. sync success, only time sync packet.
2488      */
2489     RuntimeContext::GetInstance()->NotifyTimestampChanged(100);
2490     RuntimeContext::GetInstance()->RecordAllTimeChange();
2491     RuntimeContext::GetInstance()->ClearAllDeviceTimeInfo();
2492     messageCount = 0;
2493     Sync(devices, OK);
2494     EXPECT_EQ(messageCount, 1); // 1 contain time sync request packet
2495     messageCount = 0;
2496     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2497     EXPECT_EQ(messageCount, 0);
2498     g_communicatorAggregator->RegOnDispatch(nullptr);
2499 }
2500 
2501 /**
2502  * @tc.name: KVTimeChange002
2503  * @tc.desc: test NotifyTimestampChanged will not stuck when notify delegate with no metadata
2504  * @tc.type: FUNC
2505  * @tc.require:
2506  * @tc.author: liuhongyang
2507  */
2508 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVTimeChange002, TestSize.Level0)
2509 {
2510     /**
2511      * @tc.steps: step1. open a new store with STORE_ID_3
2512      * @tc.expected: step1. open success
2513      */
2514     KvStoreNbDelegate::Option option;
2515     option.secOption.securityLabel = SecurityLabel::S3;
2516     option.secOption.securityFlag = SecurityFlag::SECE;
2517     KvStoreNbDelegate *delegate2 = nullptr;
__anonde2bd9673202(DBStatus status, KvStoreNbDelegate *delegate) 2518     g_mgr.GetKvStore(STORE_ID_3, option, [&delegate2](DBStatus status, KvStoreNbDelegate *delegate) {
2519         delegate2 = delegate;
2520         EXPECT_EQ(status, OK);
2521     });
2522     ASSERT_TRUE(delegate2 != nullptr);
2523     /**
2524      * @tc.steps: step2. STORE_ID_3 sync once so that it will be notified when time change
2525      * @tc.expected: step2. sync should return OK.
2526      */
2527     std::vector<std::string> devices;
2528     devices.push_back(g_deviceB->GetDeviceId());
2529     std::map<std::string, DBStatus> result;
2530     EXPECT_EQ(g_tool.SyncTest(delegate2, devices, SYNC_MODE_PULL_ONLY, result, true), OK);
2531     /**
2532      * @tc.steps: step3. deviceA call sync and wait
2533      * @tc.expected: step3. sync should return OK.
2534      */
2535     Sync(devices, OK);
2536     /**
2537      * @tc.steps: step4. call NotifyTimestampChanged
2538      * @tc.expected: step4. expect no deadlock
2539      */
2540     RuntimeContext::GetInstance()->NotifyTimestampChanged(100);
2541     /**
2542      * @tc.steps: step5. clean up the created db
2543      */
2544     ASSERT_EQ(g_mgr.CloseKvStore(delegate2), OK);
2545     delegate2 = nullptr;
2546     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID_3) == OK);
2547 }
2548 }