• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 
18 #include "distributeddb_data_generate_unit_test.h"
19 #include "distributeddb_tools_unit_test.h"
20 #include "kv_virtual_device.h"
21 #include "platform_specific.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "single_ver_data_packet.h"
24 #include "virtual_communicator_aggregator.h"
25 
26 using namespace testing::ext;
27 using namespace DistributedDB;
28 using namespace DistributedDBUnitTest;
29 using namespace std;
30 
31 namespace {
32     string g_testDir;
33     const string STORE_ID = "kv_stroe_sync_check_test";
34     const std::string DEVICE_B = "deviceB";
35     const std::string DEVICE_C = "deviceC";
36     const int LOCAL_WATER_MARK_NOT_INIT = 0xaa;
37     const int EIGHT_HUNDRED = 800;
38     const int NORMAL_SYNC_SEND_REQUEST_CNT = 3;
39     const int TWO_CNT = 2;
40     const int SLEEP_MILLISECONDS = 500;
41     const int TEN_SECONDS = 10;
42     const int THREE_HUNDRED = 300;
43     const int WAIT_30_SECONDS = 30000;
44     const int WAIT_40_SECONDS = 40000;
45     const int TIMEOUT_6_SECONDS = 6000;
46 
47     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
48     KvStoreConfig g_config;
49     DistributedDBToolsUnitTest g_tool;
50     DBStatus g_kvDelegateStatus = INVALID_ARGS;
51     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
52     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
53     KvVirtualDevice* g_deviceB = nullptr;
54     KvVirtualDevice* g_deviceC = nullptr;
55     VirtualSingleVerSyncDBInterface *g_syncInterfaceB = nullptr;
56     VirtualSingleVerSyncDBInterface *g_syncInterfaceC = nullptr;
57 
58     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
59     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
60         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
61 #ifndef LOW_LEVEL_MEM_DEV
62     const int KEY_LEN = 20; // 20 Bytes
63     const int VALUE_LEN = 4 * 1024 * 1024; // 4MB
64     const int ENTRY_NUM = 2; // 16 entries
65 #endif
66 }
67 
68 class DistributedDBSingleVerP2PSyncCheckTest : public testing::Test {
69 public:
70     static void SetUpTestCase(void);
71     static void TearDownTestCase(void);
72     void SetUp();
73     void TearDown();
74 };
75 
SetUpTestCase(void)76 void DistributedDBSingleVerP2PSyncCheckTest::SetUpTestCase(void)
77 {
78     /**
79      * @tc.setup: Init datadir and Virtual Communicator.
80      */
81     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
82     g_config.dataDir = g_testDir;
83     g_mgr.SetKvStoreConfig(g_config);
84 
85     string dir = g_testDir + "/single_ver";
86     DIR* dirTmp = opendir(dir.c_str());
87     if (dirTmp == nullptr) {
88         OS::MakeDBDirectory(dir);
89     } else {
90         closedir(dirTmp);
91     }
92 
93     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
94     ASSERT_TRUE(g_communicatorAggregator != nullptr);
95     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
96 
97     std::shared_ptr<ProcessSystemApiAdapterImpl> g_adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
98     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(g_adapter);
99 }
100 
TearDownTestCase(void)101 void DistributedDBSingleVerP2PSyncCheckTest::TearDownTestCase(void)
102 {
103     /**
104      * @tc.teardown: Release virtual Communicator and clear data dir.
105      */
106     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
107         LOGE("rm test db files error!");
108     }
109     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
110     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
111 }
112 
SetUp(void)113 void DistributedDBSingleVerP2PSyncCheckTest::SetUp(void)
114 {
115     DistributedDBToolsUnitTest::PrintTestCaseInfo();
116     /**
117      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
118      */
119     KvStoreNbDelegate::Option option;
120     option.secOption.securityLabel = SecurityLabel::S3;
121     option.secOption.securityFlag = SecurityFlag::SECE;
122     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
123     ASSERT_TRUE(g_kvDelegateStatus == OK);
124     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
125     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
126     ASSERT_TRUE(g_deviceB != nullptr);
127     g_syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
128     ASSERT_TRUE(g_syncInterfaceB != nullptr);
129     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, g_syncInterfaceB), E_OK);
130 
131     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
132     ASSERT_TRUE(g_deviceC != nullptr);
133     g_syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
134     ASSERT_TRUE(g_syncInterfaceC != nullptr);
135     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, g_syncInterfaceC), E_OK);
136 }
137 
TearDown(void)138 void DistributedDBSingleVerP2PSyncCheckTest::TearDown(void)
139 {
140     /**
141      * @tc.teardown: Release device A, B, C
142      */
143     if (g_kvDelegatePtr != nullptr) {
144         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
145         g_kvDelegatePtr = nullptr;
146         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
147         LOGD("delete kv store status %d", status);
148         ASSERT_TRUE(status == OK);
149     }
150     if (g_deviceB != nullptr) {
151         delete g_deviceB;
152         g_deviceB = nullptr;
153     }
154     if (g_deviceC != nullptr) {
155         delete g_deviceC;
156         g_deviceC = nullptr;
157     }
158     if (g_communicatorAggregator != nullptr) {
159         g_communicatorAggregator->RegOnDispatch(nullptr);
160     }
161 }
162 
163 /**
164  * @tc.name: sec option check Sync 001
165  * @tc.desc: if sec option not equal, forbid sync
166  * @tc.type: FUNC
167  * @tc.require: AR000EV1G6
168  * @tc.author: wangchuanqing
169  */
170 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck001, TestSize.Level1)
171 {
172     DBStatus status = OK;
173     std::vector<std::string> devices;
174     devices.push_back(g_deviceB->GetDeviceId());
175     devices.push_back(g_deviceC->GetDeviceId());
176 
177     /**
178      * @tc.steps: step1. deviceA put {k1, v1}
179      */
180     Key key = {'1'};
181     Value value = {'1'};
182     status = g_kvDelegatePtr->Put(key, value);
183     ASSERT_TRUE(status == OK);
184 
185     ASSERT_TRUE(g_syncInterfaceB != nullptr);
186     ASSERT_TRUE(g_syncInterfaceC != nullptr);
187     SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
188     g_syncInterfaceB->SetSecurityOption(secOption);
189     g_syncInterfaceC->SetSecurityOption(secOption);
190 
191     /**
192      * @tc.steps: step2. deviceA call sync and wait
193      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
194      */
195     std::map<std::string, DBStatus> result;
196     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
197     ASSERT_TRUE(status == OK);
198 
199     ASSERT_TRUE(result.size() == devices.size());
200     for (const auto &pair : result) {
201         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
202         EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
203     }
204     VirtualDataItem item;
205     g_deviceB->GetData(key, item);
206     EXPECT_TRUE(item.value.empty());
207     g_deviceC->GetData(key, item);
208     EXPECT_TRUE(item.value.empty());
209 }
210 
211 /**
212  * @tc.name: sec option check Sync 002
213  * @tc.desc: if sec option not equal, forbid sync
214  * @tc.type: FUNC
215  * @tc.require: AR000EV1G6
216  * @tc.author: wangchuanqing
217  */
218 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck002, TestSize.Level1)
219 {
220     DBStatus status = OK;
221     std::vector<std::string> devices;
222     devices.push_back(g_deviceB->GetDeviceId());
223     devices.push_back(g_deviceC->GetDeviceId());
224 
225     /**
226      * @tc.steps: step1. deviceA put {k1, v1}
227      */
228     Key key = {'1'};
229     Value value = {'1'};
230     status = g_kvDelegatePtr->Put(key, value);
231     ASSERT_TRUE(status == OK);
232 
233     ASSERT_TRUE(g_syncInterfaceC != nullptr);
234     SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
235     g_syncInterfaceC->SetSecurityOption(secOption);
236     secOption.securityLabel = SecurityLabel::S3;
237     secOption.securityFlag = SecurityFlag::SECE;
238     g_syncInterfaceB->SetSecurityOption(secOption);
239 
240     /**
241      * @tc.steps: step2. deviceA call sync and wait
242      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
243      */
244     std::map<std::string, DBStatus> result;
245     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
246     ASSERT_TRUE(status == OK);
247 
248     ASSERT_TRUE(result.size() == devices.size());
249     for (const auto &pair : result) {
250         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
251         if (pair.first == DEVICE_B) {
252             EXPECT_TRUE(pair.second == OK);
253         } else {
254             EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
255         }
256     }
257     VirtualDataItem item;
258     g_deviceC->GetData(key, item);
259     EXPECT_TRUE(item.value.empty());
260     g_deviceB->GetData(key, item);
261     EXPECT_TRUE(item.value == value);
262 }
263 
264 /**
265  * @tc.name: sec option check Sync 003
266  * @tc.desc: if sec option equal, check not pass, forbid sync
267  * @tc.type: FUNC
268  * @tc.require: AR000EV1G6
269  * @tc.author: zhangqiquan
270  */
271 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck003, TestSize.Level1)
272 {
273     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
274     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon5f4525660202(const std::string &, const SecurityOption &) 275     adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
276         return false;
277     });
278     /**
279      * @tc.steps: step1. record packet
280      * @tc.expected: step1. sync should failed in source.
281      */
282     std::atomic<int> messageCount = 0;
__anon5f4525660302(const std::string &, Message *) 283     g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &, Message *) {
284         messageCount++;
285     });
286     /**
287      * @tc.steps: step2. deviceA call sync and wait
288      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
289      */
290     DBStatus status = OK;
291     std::vector<std::string> devices;
292     devices.push_back(g_deviceB->GetDeviceId());
293     std::map<std::string, DBStatus> result;
294     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
295     EXPECT_EQ(status, OK);
296     EXPECT_EQ(messageCount, 6); // 6 = 2 time sync + 4 ability sync
297     for (const auto &pair : result) {
298         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
299         EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
300     }
301     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
302     g_communicatorAggregator->RegOnDispatch(nullptr);
303 }
304 
305 /**
306  * @tc.name: sec option check Sync 004
307  * @tc.desc: memory db not check device security
308  * @tc.type: FUNC
309  * @tc.require:
310  * @tc.author: zhangqiquan
311  */
312 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck004, TestSize.Level1)
313 {
314     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
315     g_kvDelegatePtr = nullptr;
316     KvStoreNbDelegate::Option option;
317     option.secOption.securityLabel = SecurityLabel::NOT_SET;
318     option.isMemoryDb = true;
319     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
320     ASSERT_TRUE(g_kvDelegateStatus == OK);
321     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
322 
323     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
324     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon5f4525660402(const std::string &, const SecurityOption &) 325     adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
326         return false;
327     });
__anon5f4525660502(const std::string &, SecurityOption &securityOption) 328     adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
329         securityOption.securityLabel = NOT_SET;
330         return OK;
331     });
__anon5f4525660602(SecurityOption &) 332     g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &) {
333         return -E_NOT_SUPPORT;
334     });
335 
336     std::vector<std::string> devices;
337     devices.push_back(g_deviceB->GetDeviceId());
338     std::map<std::string, DBStatus> result;
339     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
340     EXPECT_EQ(status, OK);
341     for (const auto &pair : result) {
342         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
343         EXPECT_TRUE(pair.second == OK);
344     }
345 
346     adapter->ForkCheckDeviceSecurityAbility(nullptr);
347     adapter->ForkGetSecurityOption(nullptr);
348     g_syncInterfaceB->ForkGetSecurityOption(nullptr);
349 }
350 
351 #ifndef LOW_LEVEL_MEM_DEV
352 /**
353  * @tc.name: BigDataSync001
354  * @tc.desc: big data sync push mode.
355  * @tc.type: FUNC
356  * @tc.require: AR000F3OOU
357  * @tc.author: wangchuanqing
358  */
359 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync001, TestSize.Level1)
360 {
361     DBStatus status = OK;
362     std::vector<std::string> devices;
363     devices.push_back(g_deviceB->GetDeviceId());
364     devices.push_back(g_deviceC->GetDeviceId());
365 
366     /**
367      * @tc.steps: step1. deviceA put 16 bigData
368      */
369     std::vector<Entry> entries;
370     std::vector<Key> keys;
371     DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
372     for (const auto &entry : entries) {
373         status = g_kvDelegatePtr->Put(entry.key, entry.value);
374         ASSERT_TRUE(status == OK);
375     }
376 
377     /**
378      * @tc.steps: step2. deviceA call sync and wait
379      * @tc.expected: step2. sync should return OK.
380      */
381     std::map<std::string, DBStatus> result;
382     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
383     ASSERT_TRUE(status == OK);
384 
385     /**
386      * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
387      */
388     ASSERT_TRUE(result.size() == devices.size());
389     for (const auto &pair : result) {
390         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
391         EXPECT_TRUE(pair.second == OK);
392     }
393     VirtualDataItem item;
394     for (const auto &entry : entries) {
395         item.value.clear();
396         g_deviceB->GetData(entry.key, item);
397         EXPECT_TRUE(item.value == entry.value);
398         item.value.clear();
399         g_deviceC->GetData(entry.key, item);
400         EXPECT_TRUE(item.value == entry.value);
401     }
402 }
403 
404 /**
405  * @tc.name: BigDataSync002
406  * @tc.desc: big data sync pull mode.
407  * @tc.type: FUNC
408  * @tc.require: AR000F3OOU
409  * @tc.author: wangchuanqing
410  */
411 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync002, TestSize.Level1)
412 {
413     DBStatus status = OK;
414     std::vector<std::string> devices;
415     devices.push_back(g_deviceB->GetDeviceId());
416     devices.push_back(g_deviceC->GetDeviceId());
417 
418     /**
419      * @tc.steps: step1. deviceA deviceB put bigData
420      */
421     std::vector<Entry> entries;
422     std::vector<Key> keys;
423     DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
424 
425     for (uint32_t i = 0; i < entries.size(); i++) {
426         if (i % 2 == 0) {
427             g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
428         } else {
429             g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
430         }
431     }
432 
433     /**
434      * @tc.steps: step3. deviceA call pull sync
435      * @tc.expected: step3. sync should return OK.
436      */
437     std::map<std::string, DBStatus> result;
438     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
439     ASSERT_TRUE(status == OK);
440 
441     /**
442      * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
443      */
444     ASSERT_TRUE(result.size() == devices.size());
445     for (const auto &pair : result) {
446         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
447         EXPECT_TRUE(pair.second == OK);
448     }
449     for (const auto &entry : entries) {
450         Value value;
451         EXPECT_EQ(g_kvDelegatePtr->Get(entry.key, value), OK);
452         EXPECT_EQ(value, entry.value);
453     }
454 }
455 
456 /**
457  * @tc.name: BigDataSync003
458  * @tc.desc: big data sync pushAndPull mode.
459  * @tc.type: FUNC
460  * @tc.require: AR000F3OOV
461  * @tc.author: wangchuanqing
462  */
463 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync003, TestSize.Level1)
464 {
465     DBStatus status = OK;
466     std::vector<std::string> devices;
467     devices.push_back(g_deviceB->GetDeviceId());
468     devices.push_back(g_deviceC->GetDeviceId());
469 
470     /**
471      * @tc.steps: step1. deviceA deviceB put bigData
472      */
473     std::vector<Entry> entries;
474     std::vector<Key> keys;
475     DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
476 
477     for (uint32_t i = 0; i < entries.size(); i++) {
478         if (i % 3 == 0) { // 0 3 6 9 12 15 for deivec B
479             g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
480         } else if (i % 3 == 1) { // 1 4 7 10 13 16 for device C
481             g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
482         } else { // 2 5 8 11 14 for device A
483             status = g_kvDelegatePtr->Put(entries[i].key, entries[i].value);
484             ASSERT_TRUE(status == OK);
485         }
486     }
487 
488     /**
489      * @tc.steps: step3. deviceA call pushAndpull sync
490      * @tc.expected: step3. sync should return OK.
491      */
492     std::map<std::string, DBStatus> result;
493     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
494     ASSERT_TRUE(status == OK);
495 
496     /**
497      * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
498      * deviceB and deviceC has deviceA data
499      */
500     ASSERT_TRUE(result.size() == devices.size());
501     for (const auto &pair : result) {
502         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
503         EXPECT_TRUE(pair.second == OK);
504     }
505 
506     VirtualDataItem item;
507     for (uint32_t i = 0; i < entries.size(); i++) {
508         Value value;
509         EXPECT_EQ(g_kvDelegatePtr->Get(entries[i].key, value), OK);
510         EXPECT_EQ(value, entries[i].value);
511 
512         if (i % 3 == 2) { // 2 5 8 11 14 for device A
513         item.value.clear();
514         g_deviceB->GetData(entries[i].key, item);
515         EXPECT_TRUE(item.value == entries[i].value);
516         item.value.clear();
517         g_deviceC->GetData(entries[i].key, item);
518         EXPECT_TRUE(item.value == entries[i].value);
519         }
520     }
521 }
522 #endif
523 
524 /**
525  * @tc.name: PushFinishedNotify 001
526  * @tc.desc: Test remote device push finished notify function.
527  * @tc.type: FUNC
528  * @tc.require: AR000CQS3S
529  * @tc.author: xushaohua
530  */
531 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, PushFinishedNotify001, TestSize.Level1)
532 {
533     std::vector<std::string> devices;
534     devices.push_back(g_deviceB->GetDeviceId());
535 
536     /**
537      * @tc.steps: step1. deviceA call SetRemotePushFinishedNotify
538      * @tc.expected: step1. set should return OK.
539      */
540     int pushfinishedFlag = 0;
541     DBStatus status = g_kvDelegatePtr->SetRemotePushFinishedNotify(
__anon5f4525660702(const RemotePushNotifyInfo &info) 542         [&pushfinishedFlag](const RemotePushNotifyInfo &info) {
543             EXPECT_TRUE(info.deviceId == DEVICE_B);
544             pushfinishedFlag = 1;
545     });
546     ASSERT_EQ(status, OK);
547 
548     /**
549      * @tc.steps: step2. deviceB put k2, v2, and deviceA pull from deviceB
550      * @tc.expected: step2. deviceA can not receive push finished notify
551      */
552     EXPECT_EQ(g_kvDelegatePtr->Put(KEY_2, VALUE_2), OK);
553     std::map<std::string, DBStatus> result;
554     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
555     EXPECT_TRUE(status == OK);
556     EXPECT_EQ(pushfinishedFlag, 0);
557     pushfinishedFlag = 0;
558 
559     /**
560      * @tc.steps: step3. deviceB put k3, v3, and deviceA push and pull to deviceB
561      * @tc.expected: step3. deviceA can not receive push finished notify
562      */
563     EXPECT_EQ(g_kvDelegatePtr->Put(KEY_3, VALUE_3), OK);
564     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
565     EXPECT_TRUE(status == OK);
566     EXPECT_EQ(pushfinishedFlag, 0);
567     pushfinishedFlag = 0;
568 
569     /**
570      * @tc.steps: step4. deviceA call SetRemotePushFinishedNotify to reset notify
571      * @tc.expected: step4. set should return OK.
572      */
__anon5f4525660802(const RemotePushNotifyInfo &info) 573     status = g_kvDelegatePtr->SetRemotePushFinishedNotify([&pushfinishedFlag](const RemotePushNotifyInfo &info) {
574         EXPECT_TRUE(info.deviceId == DEVICE_B);
575         pushfinishedFlag = 2;
576     });
577     ASSERT_EQ(status, OK);
578 
579     /**
580      * @tc.steps: step5. deviceA call SetRemotePushFinishedNotify set null to unregist
581      * @tc.expected: step5. set should return OK.
582      */
583     status = g_kvDelegatePtr->SetRemotePushFinishedNotify(nullptr);
584     ASSERT_EQ(status, OK);
585 }
586 
587 namespace {
RegOnDispatchWithDelayAck(bool & errCodeAck,bool & afterErrAck)588 void RegOnDispatchWithDelayAck(bool &errCodeAck, bool &afterErrAck)
589 {
590     // just delay the busy ack
591     g_communicatorAggregator->RegOnDispatch([&errCodeAck, &afterErrAck](const std::string &dev, Message *inMsg) {
592         if (dev != g_deviceB->GetDeviceId()) {
593             return;
594         }
595         auto *packet = inMsg->GetObject<DataAckPacket>();
596         if (packet->GetRecvCode() == -E_BUSY) {
597             errCodeAck = true;
598             while (!afterErrAck) {
599             }
600             LOGW("NOW SEND BUSY ACK");
601         } else if (errCodeAck) {
602             afterErrAck = true;
603             std::this_thread::sleep_for(std::chrono::seconds(1));
604         }
605     });
606 }
607 
RegOnDispatchWithOffline(bool & offlineFlag,bool & invalid,condition_variable & conditionOffline)608 void RegOnDispatchWithOffline(bool &offlineFlag, bool &invalid, condition_variable &conditionOffline)
609 {
610     g_communicatorAggregator->RegOnDispatch([&offlineFlag, &invalid, &conditionOffline](
611                                                 const std::string &dev, Message *inMsg) {
612         auto *packet = inMsg->GetObject<DataAckPacket>();
613         if (dev != DEVICE_B) {
614             if (packet->GetRecvCode() == LOCAL_WATER_MARK_NOT_INIT) {
615                 offlineFlag = true;
616                 conditionOffline.notify_all();
617                 LOGW("[Dispatch] NOTIFY OFFLINE");
618                 std::this_thread::sleep_for(std::chrono::microseconds(EIGHT_HUNDRED));
619             }
620         } else if (!invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
621             LOGW("[Dispatch] NOW INVALID THIS MSG");
622             inMsg->SetMessageType(TYPE_INVALID);
623             inMsg->SetMessageId(INVALID_MESSAGE_ID);
624             invalid = true;
625         }
626     });
627 }
628 
RegOnDispatchWithInvalidMsg(bool & invalid)629 void RegOnDispatchWithInvalidMsg(bool &invalid)
630 {
631     g_communicatorAggregator->RegOnDispatch([&invalid](
632         const std::string &dev, Message *inMsg) {
633         if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
634             LOGW("[Dispatch] NOW INVALID THIS MSG");
635             inMsg->SetMessageType(TYPE_INVALID);
636             inMsg->SetMessageId(INVALID_MESSAGE_ID);
637             invalid = true;
638         }
639     });
640 }
641 
PrepareEnv(vector<std::string> & devices,Key & key,Query & query)642 void PrepareEnv(vector<std::string> &devices, Key &key, Query &query)
643 {
644     /**
645      * @tc.steps: step1. ensure the watermark is no zero and finish timeSync and abilitySync
646      * @tc.expected: step1. should return OK.
647      */
648     Value value = {'1'};
649     std::map<std::string, DBStatus> result;
650     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
651 
652     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
653     EXPECT_TRUE(status == OK);
654     ASSERT_TRUE(result[g_deviceB->GetDeviceId()] == OK);
655 }
656 
Sync(vector<std::string> & devices,const DBStatus & targetStatus)657 void Sync(vector<std::string> &devices, const DBStatus &targetStatus)
658 {
659     std::map<std::string, DBStatus> result;
660     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result);
661     EXPECT_TRUE(status == OK);
662     for (const auto &deviceId : devices) {
663         ASSERT_TRUE(result[deviceId] == targetStatus);
664     }
665 }
666 
SyncWithQuery(vector<std::string> & devices,const Query & query,const SyncMode & mode,const DBStatus & targetStatus)667 void SyncWithQuery(vector<std::string> &devices, const Query &query, const SyncMode &mode,
668     const DBStatus &targetStatus)
669 {
670     std::map<std::string, DBStatus> result;
671     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result, query);
672     EXPECT_TRUE(status == OK);
673     for (const auto &deviceId : devices) {
674         ASSERT_TRUE(result[deviceId] == targetStatus);
675     }
676 }
677 
SyncWithQuery(vector<std::string> & devices,const Query & query,const DBStatus & targetStatus)678 void SyncWithQuery(vector<std::string> &devices, const Query &query, const DBStatus &targetStatus)
679 {
680     SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PUSH_ONLY, targetStatus);
681 }
682 
SyncWithDeviceOffline(vector<std::string> & devices,Key & key,const Query & query)683 void SyncWithDeviceOffline(vector<std::string> &devices, Key &key, const Query &query)
684 {
685     Value value = {'2'};
686     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
687 
688     /**
689      * @tc.steps: step2. invalid the sync msg
690      * @tc.expected: step2. should return TIME_OUT.
691      */
692     SyncWithQuery(devices, query, TIME_OUT);
693 
694     /**
695      * @tc.steps: step3. device offline when sync
696      * @tc.expected: step3. should return COMM_FAILURE.
697      */
698     SyncWithQuery(devices, query, COMM_FAILURE);
699 }
700 
PrepareWaterMarkError(std::vector<std::string> & devices,Query & query)701 void PrepareWaterMarkError(std::vector<std::string> &devices, Query &query)
702 {
703     /**
704      * @tc.steps: step1. prepare data
705      */
706     devices.push_back(g_deviceB->GetDeviceId());
707     g_deviceB->Online();
708 
709     Key key = {'1'};
710     query = Query::Select().PrefixKey(key);
711     PrepareEnv(devices, key, query);
712 
713     /**
714      * @tc.steps: step2. query sync and set queryWaterMark
715      * @tc.expected: step2. should return OK.
716      */
717     Value value = {'2'};
718     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
719     SyncWithQuery(devices, query, OK);
720 
721     /**
722      * @tc.steps: step3. sync and invalid msg for set local device waterMark
723      * @tc.expected: step3. should return TIME_OUT.
724      */
725     bool invalidMsg = false;
726     RegOnDispatchWithInvalidMsg(invalidMsg);
727     value = {'3'};
728     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
729     Sync(devices, TIME_OUT);
730     g_communicatorAggregator->RegOnDispatch(nullptr);
731 }
732 }
733 
734 /**
735  * @tc.name: AckSessionCheck 001
736  * @tc.desc: Test ack session check function.
737  * @tc.type: FUNC
738  * @tc.require: AR000F3OOV
739  * @tc.author: zhangqiquan
740  */
741 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSessionCheck001, TestSize.Level3)
742 {
743     std::vector<std::string> devices;
744     devices.push_back(g_deviceB->GetDeviceId());
745 
746     /**
747      * @tc.steps: step1. deviceB sync to deviceA just for timeSync and abilitySync
748      * @tc.expected: step1. should return OK.
749      */
750     ASSERT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
751 
752     /**
753      * @tc.steps: step2. deviceA StartTransaction for prevent other sync action deviceB sync will fail
754      * @tc.expected: step2. should return OK.
755      */
756     ASSERT_TRUE(g_kvDelegatePtr->StartTransaction() == OK);
757 
758     bool errCodeAck = false;
759     bool afterErrAck = false;
760     RegOnDispatchWithDelayAck(errCodeAck, afterErrAck);
761 
762     Key key = {'1'};
763     Value value = {'1'};
764     Timestamp currentTime;
765     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
766     EXPECT_TRUE(g_deviceB->PutData(key, value, currentTime, 0) == OK);
767     EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
768 
769     Value outValue;
770     EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == NOT_FOUND);
771 
772     /**
773      * @tc.steps: step3. release the writeHandle and try again, sync success
774      * @tc.expected: step3. should return OK.
775      */
776     EXPECT_TRUE(g_kvDelegatePtr->Commit() == OK);
777     EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
778 
779     EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == E_OK);
780     EXPECT_EQ(outValue, value);
781 }
782 
783 /**
784  * @tc.name: AckSafeCheck001
785  * @tc.desc: Test ack session check filter all bad ack in device offline scene.
786  * @tc.type: FUNC
787  * @tc.require: AR000F3OOV
788  * @tc.author: zhangqiquan
789  */
790 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSafeCheck001, TestSize.Level3)
791 {
792     std::vector<std::string> devices;
793     devices.push_back(g_deviceB->GetDeviceId());
794     g_deviceB->Online();
795 
796     Key key = {'1'};
797     Query query = Query::Select().PrefixKey(key);
798     PrepareEnv(devices, key, query);
799 
800     std::condition_variable conditionOnline;
801     std::condition_variable conditionOffline;
802     bool onlineFlag = false;
803     bool invalid = false;
804     bool offlineFlag = false;
__anon5f4525660d02() 805     thread subThread([&onlineFlag, &conditionOnline, &offlineFlag, &conditionOffline]() {
806         LOGW("[Dispatch] NOW DEVICES IS OFFLINE");
807         std::mutex offlineMtx;
808         std::unique_lock<std::mutex> lck(offlineMtx);
809         conditionOffline.wait(lck, [&offlineFlag]{ return offlineFlag; });
810         g_deviceB->Offline();
811         std::this_thread::sleep_for(std::chrono::milliseconds(100));
812         g_deviceB->Online();
813         onlineFlag = true;
814         conditionOnline.notify_all();
815         LOGW("[Dispatch] NOW DEVICES IS ONLINE");
816     });
817     subThread.detach();
818 
819     RegOnDispatchWithOffline(offlineFlag, invalid, conditionOffline);
820 
821     SyncWithDeviceOffline(devices, key, query);
822 
823     std::mutex onlineMtx;
824     std::unique_lock<std::mutex> lck(onlineMtx);
__anon5f4525660f02null825     conditionOnline.wait(lck, [&onlineFlag]{ return onlineFlag; });
826 
827     /**
828      * @tc.steps: step4. sync again if has problem it will sync never end
829      * @tc.expected: step4. should return OK.
830      */
831     SyncWithQuery(devices, query, OK);
832 }
833 
834 /**
835  * @tc.name: WaterMarkCheck001
836  * @tc.desc: Test waterMark work correct in lost package scene.
837  * @tc.type: FUNC
838  * @tc.require: AR000F3OOV
839  * @tc.author: zhangqiquan
840  */
841 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck001, TestSize.Level1)
842 {
843     std::vector<std::string> devices;
844     Query query = Query::Select();
845     PrepareWaterMarkError(devices, query);
846 
847     /**
848      * @tc.steps: step4. sync again see it work correct
849      * @tc.expected: step4. should return OK.
850      */
851     SyncWithQuery(devices, query, OK);
852 }
853 
854 /**
855  * @tc.name: WaterMarkCheck002
856  * @tc.desc: Test pull work correct in error waterMark scene.
857  * @tc.type: FUNC
858  * @tc.require: AR000F3OOV
859  * @tc.author: zhangqiquan
860  */
861 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck002, TestSize.Level1)
862 {
863     std::vector<std::string> devices;
864     Query query = Query::Select();
865     PrepareWaterMarkError(devices, query);
866 
867     /**
868      * @tc.steps: step4. sync again see it work correct
869      * @tc.expected: step4. should return OK.
870      */
871     Key key = {'2'};
872     ASSERT_TRUE(g_kvDelegatePtr->Put(key, {}) == OK);
873     query = Query::Select();
874     SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PULL_ONLY, OK);
875 
876     VirtualDataItem item;
877     EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
878 }
879 
RegOnDispatchToGetSyncCount(int & sendRequestCount,int sleepMs=0)880 void RegOnDispatchToGetSyncCount(int &sendRequestCount, int sleepMs = 0)
881 {
882     g_communicatorAggregator->RegOnDispatch([sleepMs, &sendRequestCount](
883             const std::string &dev, Message *inMsg) {
884         if (dev == DEVICE_B && inMsg->GetMessageType() == TYPE_REQUEST) {
885             std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
886             sendRequestCount++;
887             LOGD("sendRequestCount++...");
888         }
889     });
890 }
891 
TestDifferentSyncMode(SyncMode mode)892 void TestDifferentSyncMode(SyncMode mode)
893 {
894     std::vector<std::string> devices;
895     devices.push_back(g_deviceB->GetDeviceId());
896 
897     /**
898      * @tc.steps: step1. deviceA put {k1, v1}
899      */
900     Key key = {'1'};
901     Value value = {'1'};
902     DBStatus status = g_kvDelegatePtr->Put(key, value);
903     ASSERT_TRUE(status == OK);
904 
905     int sendRequestCount = 0;
906     RegOnDispatchToGetSyncCount(sendRequestCount);
907 
908     /**
909      * @tc.steps: step2. deviceA call sync and wait
910      * @tc.expected: step2. sync should return OK.
911      */
912     std::map<std::string, DBStatus> result;
913     status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result);
914     ASSERT_TRUE(status == OK);
915 
916     /**
917      * @tc.expected: step2. onComplete should be called, DeviceB have {k1,v1}, send request message 3 times
918      */
919     ASSERT_TRUE(result.size() == devices.size());
920     for (const auto &pair : result) {
921         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
922         EXPECT_TRUE(pair.second == OK);
923     }
924     VirtualDataItem item;
925     g_deviceB->GetData(key, item);
926     EXPECT_TRUE(item.value == value);
927 
928     EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
929 
930     /**
931      * @tc.steps: step3. reset sendRequestCount to 0, deviceA call sync and wait again without any change in db
932      * @tc.expected: step3. sync should return OK, and sendRequestCount should be 1, because this merge can not
933      * be skipped
934      */
935     sendRequestCount = 0;
936     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
937     ASSERT_TRUE(status == OK);
938     EXPECT_EQ(sendRequestCount, 1);
939 }
940 
941 /**
942  * @tc.name: PushSyncMergeCheck001
943  * @tc.desc: Test push sync task merge, task can not be merged when the two sync task is not in the queue
944  * at the same time.
945  * @tc.type: FUNC
946  * @tc.require: AR000F3OOV
947  * @tc.author: zhangshijie
948  */
949 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck001, TestSize.Level1)
950 {
951     TestDifferentSyncMode(SYNC_MODE_PUSH_ONLY);
952 }
953 
954 /**
955  * @tc.name: PushSyncMergeCheck002
956  * @tc.desc: Test push_pull sync task merge, task can not be merged when the two sync task is not in the queue
957  * at the same time.
958  * @tc.type: FUNC
959  * @tc.require: AR000F3OOV
960  * @tc.author: zhangshijie
961  */
962 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck002, TestSize.Level1)
963 {
964     TestDifferentSyncMode(SYNC_MODE_PUSH_PULL);
965 }
966 
PrepareForSyncMergeTest(std::vector<std::string> & devices,int & sendRequestCount)967 void PrepareForSyncMergeTest(std::vector<std::string> &devices, int &sendRequestCount)
968 {
969     /**
970      * @tc.steps: step1. deviceA put {k1, v1}
971      */
972     Key key = {'1'};
973     Value value = {'1'};
974     DBStatus status = g_kvDelegatePtr->Put(key, value);
975     ASSERT_TRUE(status == OK);
976 
977     RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
978 
979     /**
980      * @tc.steps: step2. deviceA call sync and don't wait
981      * @tc.expected: step2. sync should return OK.
982      */
983     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
984         [&sendRequestCount, devices, key, value](const std::map<std::string, DBStatus>& statusMap) {
985         ASSERT_TRUE(statusMap.size() == devices.size());
986         for (const auto &pair : statusMap) {
987             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
988             EXPECT_TRUE(pair.second == OK);
989         }
990         VirtualDataItem item;
991         g_deviceB->GetData(key, item);
992         EXPECT_EQ(item.value, value);
993         EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
994 
995         // reset sendRequestCount to 0
996         sendRequestCount = 0;
997     });
998     ASSERT_TRUE(status == OK);
999 }
1000 
1001 /**
1002  * @tc.name: PushSyncMergeCheck003
1003  * @tc.desc: Test push sync task merge, task can not be merged when there is change in db since last push sync
1004  * @tc.type: FUNC
1005  * @tc.require: AR000F3OOV
1006  * @tc.author: zhangshijie
1007  */
1008 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck003, TestSize.Level3)
1009 {
1010     DBStatus status = OK;
1011     std::vector<std::string> devices;
1012     devices.push_back(g_deviceB->GetDeviceId());
1013 
1014     int sendRequestCount = 0;
1015     PrepareForSyncMergeTest(devices, sendRequestCount);
1016 
1017     /**
1018      * @tc.steps: step3. deviceA call sync and don't wait
1019      * @tc.expected: step3. sync should return OK.
1020      */
1021     Key key = {'1'};
1022     Value value = {'2'};
1023     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661202(const std::map<std::string, DBStatus>& statusMap) 1024         [&sendRequestCount, devices, key, value, this](const std::map<std::string, DBStatus>& statusMap) {
1025         /**
1026          * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1027          * skipped, but it is no need to do time sync and ability sync, only need to do data sync
1028          */
1029         ASSERT_TRUE(statusMap.size() == devices.size());
1030         for (const auto &pair : statusMap) {
1031             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1032             EXPECT_TRUE(pair.second == OK);
1033         }
1034         VirtualDataItem item;
1035         g_deviceB->GetData(key, item);
1036         EXPECT_EQ(item.value, value);
1037     });
1038     ASSERT_TRUE(status == OK);
1039 
1040     /**
1041      * @tc.steps: step4. deviceA put {k1, v2}
1042      */
1043     while (sendRequestCount < TWO_CNT) {
1044         std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1045     }
1046     status = g_kvDelegatePtr->Put(key, value);
1047     ASSERT_TRUE(status == OK);
1048     // wait for the second sync task finish
1049     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1050     EXPECT_EQ(sendRequestCount, 1);
1051 }
1052 
1053 /**
1054  * @tc.name: PushSyncMergeCheck004
1055  * @tc.desc: Test push sync task merge, task can be merged when there is no change in db since last push sync
1056  * @tc.type: FUNC
1057  * @tc.require: AR000F3OOV
1058  * @tc.author: zhangshijie
1059  */
1060 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck004, TestSize.Level3)
1061 {
1062     DBStatus status = OK;
1063     std::vector<std::string> devices;
1064     devices.push_back(g_deviceB->GetDeviceId());
1065 
1066     int sendRequestCount = 0;
1067     PrepareForSyncMergeTest(devices, sendRequestCount);
1068 
1069     /**
1070      * @tc.steps: step3. deviceA call sync and don't wait
1071      * @tc.expected: step3. sync should return OK.
1072      */
1073     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661302(const std::map<std::string, DBStatus>& statusMap) 1074         [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1075         /**
1076          * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can  be
1077          * skipped
1078          */
1079         ASSERT_TRUE(statusMap.size() == devices.size());
1080         for (const auto &pair : statusMap) {
1081             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1082             EXPECT_TRUE(pair.second == OK);
1083         }
1084     });
1085     ASSERT_TRUE(status == OK);
1086     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1087     EXPECT_EQ(sendRequestCount, 0);
1088 }
1089 
RegOnDispatchWithInvalidMsgAndCnt(int & sendRequestCount,int sleepMs,bool & invalid)1090 void RegOnDispatchWithInvalidMsgAndCnt(int &sendRequestCount, int sleepMs, bool &invalid)
1091 {
1092     g_communicatorAggregator->RegOnDispatch([&sendRequestCount, sleepMs, &invalid](
1093         const std::string &dev, Message *inMsg) {
1094         if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
1095             inMsg->SetMessageType(TYPE_INVALID);
1096             inMsg->SetMessageId(INVALID_MESSAGE_ID);
1097             sendRequestCount++;
1098             invalid = true;
1099             LOGW("[Dispatch]invalid THIS MSG, sendRequestCount = %d", sendRequestCount);
1100             std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1101         }
1102     });
1103 }
1104 
1105 /**
1106  * @tc.name: PushSyncMergeCheck005
1107  * @tc.desc: Test push sync task merge, task cannot be merged when the last push sync is failed
1108  * @tc.type: FUNC
1109  * @tc.require: AR000F3OOV
1110  * @tc.author: zhangshijie
1111  */
1112 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck005, TestSize.Level3)
1113 {
1114     DBStatus status = OK;
1115     std::vector<std::string> devices;
1116     devices.push_back(g_deviceB->GetDeviceId());
1117 
1118     /**
1119      * @tc.steps: step1. deviceA put {k1, v1}
1120      */
1121     Key key = {'1'};
1122     Value value = {'1'};
1123     status = g_kvDelegatePtr->Put(key, value);
1124     ASSERT_TRUE(status == OK);
1125 
1126     int sendRequestCount = 0;
1127     bool invalid = false;
1128     RegOnDispatchWithInvalidMsgAndCnt(sendRequestCount, SLEEP_MILLISECONDS, invalid);
1129 
1130     /**
1131      * @tc.steps: step2. deviceA call sync and don't wait
1132      * @tc.expected: step2. sync should return TIME_OUT.
1133      */
1134     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661502(const std::map<std::string, DBStatus>& statusMap) 1135         [&sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1136         ASSERT_TRUE(statusMap.size() == devices.size());
1137         for (const auto &deviceId : devices) {
1138             ASSERT_EQ(statusMap.at(deviceId), TIME_OUT);
1139         }
1140     });
1141     EXPECT_TRUE(status == OK);
1142 
1143     /**
1144      * @tc.steps: step3. deviceA call sync and don't wait
1145      * @tc.expected: step3. sync should return OK.
1146      */
1147     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661602(const std::map<std::string, DBStatus>& statusMap) 1148         [key, value, &sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1149         /**
1150          * @tc.expected: when the second sync task return, sendRequestCount should be 3, because this merge can not be
1151          * skipped, deviceB should have {k1, v1}.
1152          */
1153         ASSERT_TRUE(statusMap.size() == devices.size());
1154         for (const auto &pair : statusMap) {
1155             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1156             EXPECT_EQ(pair.second, OK);
1157         }
1158         VirtualDataItem item;
1159         g_deviceB->GetData(key, item);
1160         EXPECT_EQ(item.value, value);
1161     });
1162     ASSERT_TRUE(status == OK);
1163     while (sendRequestCount < 1) {
1164         std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1165     }
1166     sendRequestCount = 0;
1167     RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1168 
1169     // wait for the second sync task finish
1170     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1171     EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1172 }
1173 
PrePareForQuerySyncMergeTest(bool isQuerySync,std::vector<std::string> & devices,Key & key,Value & value,int & sendRequestCount)1174 void PrePareForQuerySyncMergeTest(bool isQuerySync, std::vector<std::string> &devices,
1175     Key &key, Value &value, int &sendRequestCount)
1176 {
1177     DBStatus status = OK;
1178     /**
1179      * @tc.steps: step1. deviceA put {k1, v1}...{k10, v10}
1180      */
1181     Query query = Query::Select().PrefixKey(key);
1182     const int dataSize = 10;
1183     for (int i = 0; i < dataSize; i++) {
1184         key.push_back(i);
1185         value.push_back(i);
1186         status = g_kvDelegatePtr->Put(key, value);
1187         ASSERT_TRUE(status == OK);
1188         key.pop_back();
1189         value.pop_back();
1190     }
1191 
1192     RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1193     /**
1194      * @tc.steps: step2. deviceA call query sync and don't wait
1195      * @tc.expected: step2. sync should return OK.
1196      */
1197     auto completeCallBack = [&sendRequestCount, &key, &value, dataSize, devices]
1198         (const std::map<std::string, DBStatus>& statusMap) {
1199         ASSERT_TRUE(statusMap.size() == devices.size());
1200         for (const auto &pair : statusMap) {
1201             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1202             EXPECT_EQ(pair.second, OK);
1203         }
1204         // when first sync finish, DeviceB have {k1,v1}, {k3,v3}, {k5,v5} .. send request message 3 times
1205         VirtualDataItem item;
1206         for (int i = 0; i < dataSize; i++) {
1207             key.push_back(i);
1208             value.push_back(i);
1209             g_deviceB->GetData(key, item);
1210             EXPECT_EQ(item.value, value);
1211             key.pop_back();
1212             value.pop_back();
1213         }
1214         EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1215         // reset sendRequestCount to 0
1216         sendRequestCount = 0;
1217     };
1218     if (isQuerySync) {
1219         status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack, query, false);
1220     } else {
1221         status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack);
1222     }
1223     ASSERT_TRUE(status == OK);
1224 }
1225 
1226 /**
1227  * @tc.name: QuerySyncMergeCheck001
1228  * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last query sync
1229  * @tc.type: FUNC
1230  * @tc.require: AR000F3OOV
1231  * @tc.author: zhangshijie
1232  */
1233 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck001, TestSize.Level3)
1234 {
1235     std::vector<std::string> devices;
1236     int sendRequestCount = 0;
1237     devices.push_back(g_deviceB->GetDeviceId());
1238 
1239     Key key {'1'};
1240     Value value {'1'};
1241     Query query = Query::Select().PrefixKey(key);
1242     PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1243 
1244     /**
1245      * @tc.steps: step3. deviceA call query sync and don't wait
1246      * @tc.expected: step3. sync should return OK.
1247      */
1248     DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661802(const std::map<std::string, DBStatus>& statusMap) 1249         [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1250         /**
1251          * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can be
1252          * skipped because there is no change in db since last query sync
1253          */
1254         ASSERT_TRUE(statusMap.size() == devices.size());
1255         for (const auto &pair : statusMap) {
1256             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1257             EXPECT_TRUE(pair.second == OK);
1258         }
1259     }, query, false);
1260     ASSERT_TRUE(status == OK);
1261     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1262     EXPECT_EQ(sendRequestCount, 0);
1263 }
1264 
1265 /**
1266  * @tc.name: QuerySyncMergeCheck002
1267  * @tc.desc: Test query push sync task merge, task can not be merged when there is change in db since last sync
1268  * @tc.type: FUNC
1269  * @tc.require: AR000F3OOV
1270  * @tc.author: zhangshijie
1271  */
1272 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck002, TestSize.Level3)
1273 {
1274     std::vector<std::string> devices;
1275     int sendRequestCount = 0;
1276     devices.push_back(g_deviceB->GetDeviceId());
1277 
1278     Key key {'1'};
1279     Value value {'1'};
1280     Query query = Query::Select().PrefixKey(key);
1281     PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1282 
1283     /**
1284      * @tc.steps: step3. deviceA call query sync and don't wait
1285      * @tc.expected: step3. sync should return OK.
1286      */
1287     Value value3{'3'};
1288     DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661902(const std::map<std::string, DBStatus>& statusMap) 1289         [&sendRequestCount, devices, key, value3, this](const std::map<std::string, DBStatus>& statusMap) {
1290         /**
1291          * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1292          * skipped when there is change in db since last query sync, deviceB have {k1, v1'}
1293          */
1294         ASSERT_TRUE(statusMap.size() == devices.size());
1295         for (const auto &pair : statusMap) {
1296             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1297             EXPECT_TRUE(pair.second == OK);
1298         }
1299         VirtualDataItem item;
1300         g_deviceB->GetData(key, item);
1301         EXPECT_TRUE(item.value == value3);
1302         EXPECT_EQ(sendRequestCount, 1);
1303         }, query, false);
1304     ASSERT_TRUE(status == OK);
1305 
1306     /**
1307      * @tc.steps: step4. deviceA put {k1, v1'}
1308      * @tc.steps: step4. reset sendRequestCount to 0, deviceA call sync and wait
1309      * @tc.expected: step4. sync should return OK, and sendRequestCount should be 1, because this merge can not
1310      * be skipped
1311      */
1312     while (sendRequestCount < TWO_CNT) {
1313         std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1314     }
1315     g_kvDelegatePtr->Put(key, value3);
1316     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1317 }
1318 
1319 /**
1320  * @tc.name: QuerySyncMergeCheck003
1321  * @tc.desc: Test query push sync task merge, task can not be merged when then query id is different
1322  * @tc.type: FUNC
1323  * @tc.require: AR000F3OOV
1324  * @tc.author: zhangshijie
1325  */
1326 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck003, TestSize.Level3)
1327 {
1328     std::vector<std::string> devices;
1329     int sendRequestCount = 0;
1330     devices.push_back(g_deviceB->GetDeviceId());
1331 
1332     Key key {'1'};
1333     Value value {'1'};
1334     PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1335 
1336     /**
1337      * @tc.steps: step3.  deviceA call another query sync
1338      * @tc.expected: step3. sync should return OK.
1339      */
1340     Key key2 = {'2'};
1341     Value value2 = {'2'};
1342     DBStatus status = g_kvDelegatePtr->Put(key2, value2);
1343     ASSERT_TRUE(status == OK);
1344     Query query2 = Query::Select().PrefixKey(key2);
1345     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661a02(const std::map<std::string, DBStatus>& statusMap) 1346         [&sendRequestCount, key2, value2, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1347         /**
1348          * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1349          * skipped, deviceB have {k2,v2}
1350          */
1351         ASSERT_TRUE(statusMap.size() == devices.size());
1352         for (const auto &pair : statusMap) {
1353             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1354             EXPECT_TRUE(pair.second == OK);
1355         }
1356         VirtualDataItem item;
1357         g_deviceB->GetData(key2, item);
1358         EXPECT_TRUE(item.value == value2);
1359         EXPECT_EQ(sendRequestCount, 1);
1360         }, query2, false);
1361     ASSERT_TRUE(status == OK);
1362     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1363 }
1364 
1365 /**
1366 * @tc.name: QuerySyncMergeCheck004
1367 * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last push sync
1368 * @tc.type: FUNC
1369 * @tc.require: AR000F3OOV
1370 * @tc.author: zhangshijie
1371 */
1372 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck004, TestSize.Level3)
1373 {
1374     DBStatus status = OK;
1375     std::vector<std::string> devices;
1376     devices.push_back(g_deviceB->GetDeviceId());
1377 
1378     Key key {'1'};
1379     Value value {'1'};
1380     int sendRequestCount = 0;
1381     PrePareForQuerySyncMergeTest(false, devices, key, value, sendRequestCount);
1382 
1383     /**
1384      * @tc.steps: step3. deviceA call query sync without any change in db
1385      * @tc.expected: step3. sync should return OK, and sendRequestCount should be 0, because this merge can be skipped
1386      */
1387     Query query = Query::Select().PrefixKey(key);
1388     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661b02(const std::map<std::string, DBStatus>& statusMap) 1389         [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1390             /**
1391              * @tc.expected step3: when the second sync task return, sendRequestCount should be 0, because this merge
1392              * can be skipped because there is no change in db since last push sync
1393              */
1394             ASSERT_TRUE(statusMap.size() == devices.size());
1395             for (const auto &pair : statusMap) {
1396                 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1397                 EXPECT_TRUE(pair.second == OK);
1398             }
1399         }, query, false);
1400     ASSERT_TRUE(status == OK);
1401     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1402     EXPECT_EQ(sendRequestCount, 0);
1403 }
1404 
1405 /**
1406   * @tc.name: GetDataNotify001
1407   * @tc.desc: Test GetDataNotify function, delay < 30s should sync ok, > 36 should timeout
1408   * @tc.type: FUNC
1409   * @tc.require: AR000D4876
1410   * @tc.author: zhangqiquan
1411   */
1412 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify001, TestSize.Level3)
1413 {
1414     ASSERT_NE(g_kvDelegatePtr, nullptr);
1415     DBStatus status = OK;
1416     std::vector<std::string> devices;
1417     devices.push_back(g_deviceB->GetDeviceId());
1418     const std::string DEVICE_A = "real_device";
1419     /**
1420      * @tc.steps: step1. deviceB set get data delay 40s
1421      */
1422     g_deviceB->DelayGetSyncData(WAIT_40_SECONDS);
1423     g_communicatorAggregator->SetTimeout(DEVICE_A, TIMEOUT_6_SECONDS);
1424 
1425     /**
1426      * @tc.steps: step2. deviceA call sync and wait
1427      * @tc.expected: step2. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
1428      */
1429     std::map<std::string, DBStatus> result;
1430     std::map<std::string, int> virtualRes;
1431     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1432     EXPECT_EQ(status, OK);
1433     EXPECT_EQ(result.size(), devices.size());
1434     EXPECT_EQ(result[DEVICE_B], TIME_OUT);
1435     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1436     Query query = Query::Select();
__anon5f4525661c02(std::map<std::string, int> resMap) 1437     g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1438         virtualRes = std::move(resMap);
1439     }, true);
1440     EXPECT_EQ(virtualRes.size(), devices.size());
1441     EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_TIMEOUT));
1442     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1443 
1444     /**
1445      * @tc.steps: step3. deviceB set get data delay 30s
1446      */
1447     g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1448     /**
1449      * @tc.steps: step4. deviceA call sync and wait
1450      * @tc.expected: step4. sync should return OK. onComplete should be called, deviceB sync OK.
1451      */
1452     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1453     EXPECT_EQ(status, OK);
1454     EXPECT_EQ(result.size(), devices.size());
1455     EXPECT_EQ(result[DEVICE_B], OK);
1456     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
__anon5f4525661d02(std::map<std::string, int> resMap) 1457     g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1458         virtualRes = std::move(resMap);
1459     }, true);
1460     EXPECT_EQ(virtualRes.size(), devices.size());
1461     EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_FINISHED_ALL));
1462     g_deviceB->DelayGetSyncData(0);
1463 }
1464 
1465 /**
1466   * @tc.name: GetDataNotify002
1467   * @tc.desc: Test GetDataNotify function, two device sync each other at same time
1468   * @tc.type: FUNC
1469   * @tc.require: AR000D4876
1470   * @tc.author: zhangqiquan
1471   */
1472 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify002, TestSize.Level3)
1473 {
1474     ASSERT_NE(g_kvDelegatePtr, nullptr);
1475     DBStatus status = OK;
1476     std::vector<std::string> devices;
1477     devices.push_back(g_deviceB->GetDeviceId());
1478     const std::string DEVICE_A = "real_device";
1479 
1480     /**
1481      * @tc.steps: step1. deviceA sync first to finish time sync and ability sync
1482      */
1483     std::map<std::string, DBStatus> result;
1484     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1485     EXPECT_EQ(status, OK);
1486     EXPECT_EQ(result.size(), devices.size());
1487     EXPECT_EQ(result[DEVICE_B], OK);
1488     /**
1489      * @tc.steps: step2. deviceB set get data delay 30s
1490      */
1491     g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1492 
1493     /**
1494      * @tc.steps: step3. deviceB call sync and wait
1495      */
__anon5f4525661e02() 1496     std::thread asyncThread([]() {
1497         std::map<std::string, int> virtualRes;
1498         Query query = Query::Select();
1499         g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1500                 virtualRes = std::move(resMap);
1501             }, true);
1502     });
1503 
1504     /**
1505      * @tc.steps: step4. deviceA call sync and wait
1506      * @tc.expected: step4. sync should return OK. because notify timer trigger (30s - 1s)/2s => 15times
1507      */
1508     std::this_thread::sleep_for(std::chrono::seconds(1));
1509     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1510     EXPECT_EQ(status, OK);
1511     EXPECT_EQ(result.size(), devices.size());
1512     EXPECT_EQ(result[DEVICE_B], OK);
1513     asyncThread.join();
1514     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1515 }