• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 
18 #include "distributeddb_data_generate_unit_test.h"
19 #include "distributeddb_tools_unit_test.h"
20 #include "kv_virtual_device.h"
21 #include "platform_specific.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "single_ver_data_packet.h"
24 #include "virtual_communicator_aggregator.h"
25 
26 using namespace testing::ext;
27 using namespace DistributedDB;
28 using namespace DistributedDBUnitTest;
29 using namespace std;
30 
31 namespace {
32     string g_testDir;
33     const string STORE_ID = "kv_stroe_sync_check_test";
34     const std::string DEVICE_B = "deviceB";
35     const std::string DEVICE_C = "deviceC";
36     const int LOCAL_WATER_MARK_NOT_INIT = 0xaa;
37     const int EIGHT_HUNDRED = 800;
38     const int NORMAL_SYNC_SEND_REQUEST_CNT = 3;
39     const int TWO_CNT = 2;
40     const int SLEEP_MILLISECONDS = 500;
41     const int TEN_SECONDS = 10;
42     const int THREE_HUNDRED = 300;
43     const int WAIT_30_SECONDS = 30000;
44     const int WAIT_40_SECONDS = 40000;
45     const int TIMEOUT_6_SECONDS = 6000;
46 
47     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
48     KvStoreConfig g_config;
49     DistributedDBToolsUnitTest g_tool;
50     DBStatus g_kvDelegateStatus = INVALID_ARGS;
51     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
52     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
53     KvVirtualDevice* g_deviceB = nullptr;
54     KvVirtualDevice* g_deviceC = nullptr;
55     VirtualSingleVerSyncDBInterface *g_syncInterfaceB = nullptr;
56     VirtualSingleVerSyncDBInterface *g_syncInterfaceC = nullptr;
57 
58     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
59     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
60         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
61 #ifndef LOW_LEVEL_MEM_DEV
62     const int KEY_LEN = 20; // 20 Bytes
63     const int VALUE_LEN = 4 * 1024 * 1024; // 4MB
64     const int ENTRY_NUM = 2; // 16 entries
65 #endif
66 }
67 
68 class DistributedDBSingleVerP2PSyncCheckTest : public testing::Test {
69 public:
70     static void SetUpTestCase(void);
71     static void TearDownTestCase(void);
72     void SetUp();
73     void TearDown();
74 };
75 
SetUpTestCase(void)76 void DistributedDBSingleVerP2PSyncCheckTest::SetUpTestCase(void)
77 {
78     /**
79      * @tc.setup: Init datadir and Virtual Communicator.
80      */
81     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
82     g_config.dataDir = g_testDir;
83     g_mgr.SetKvStoreConfig(g_config);
84 
85     string dir = g_testDir + "/single_ver";
86     DIR* dirTmp = opendir(dir.c_str());
87     if (dirTmp == nullptr) {
88         OS::MakeDBDirectory(dir);
89     } else {
90         closedir(dirTmp);
91     }
92 
93     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
94     ASSERT_TRUE(g_communicatorAggregator != nullptr);
95     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
96 
97     std::shared_ptr<ProcessSystemApiAdapterImpl> g_adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
98     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(g_adapter);
99 }
100 
TearDownTestCase(void)101 void DistributedDBSingleVerP2PSyncCheckTest::TearDownTestCase(void)
102 {
103     /**
104      * @tc.teardown: Release virtual Communicator and clear data dir.
105      */
106     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
107         LOGE("rm test db files error!");
108     }
109     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
110     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
111 }
112 
SetUp(void)113 void DistributedDBSingleVerP2PSyncCheckTest::SetUp(void)
114 {
115     DistributedDBToolsUnitTest::PrintTestCaseInfo();
116     /**
117      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
118      */
119     KvStoreNbDelegate::Option option;
120     option.secOption.securityLabel = SecurityLabel::S3;
121     option.secOption.securityFlag = SecurityFlag::SECE;
122     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
123     ASSERT_TRUE(g_kvDelegateStatus == OK);
124     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
125     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
126     ASSERT_TRUE(g_deviceB != nullptr);
127     g_syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
128     ASSERT_TRUE(g_syncInterfaceB != nullptr);
129     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, g_syncInterfaceB), E_OK);
130 
131     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
132     ASSERT_TRUE(g_deviceC != nullptr);
133     g_syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
134     ASSERT_TRUE(g_syncInterfaceC != nullptr);
135     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, g_syncInterfaceC), E_OK);
136 }
137 
TearDown(void)138 void DistributedDBSingleVerP2PSyncCheckTest::TearDown(void)
139 {
140     /**
141      * @tc.teardown: Release device A, B, C
142      */
143     if (g_kvDelegatePtr != nullptr) {
144         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
145         g_kvDelegatePtr = nullptr;
146         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
147         LOGD("delete kv store status %d", status);
148         ASSERT_TRUE(status == OK);
149     }
150     if (g_deviceB != nullptr) {
151         delete g_deviceB;
152         g_deviceB = nullptr;
153     }
154     if (g_deviceC != nullptr) {
155         delete g_deviceC;
156         g_deviceC = nullptr;
157     }
158     if (g_communicatorAggregator != nullptr) {
159         g_communicatorAggregator->RegOnDispatch(nullptr);
160     }
161 }
162 
163 /**
164  * @tc.name: sec option check Sync 001
165  * @tc.desc: if sec option not equal, forbid sync
166  * @tc.type: FUNC
167  * @tc.require: AR000EV1G6
168  * @tc.author: wangchuanqing
169  */
170 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck001, TestSize.Level1)
171 {
172     DBStatus status = OK;
173     std::vector<std::string> devices;
174     devices.push_back(g_deviceB->GetDeviceId());
175     devices.push_back(g_deviceC->GetDeviceId());
176 
177     /**
178      * @tc.steps: step1. deviceA put {k1, v1}
179      */
180     Key key = {'1'};
181     Value value = {'1'};
182     status = g_kvDelegatePtr->Put(key, value);
183     ASSERT_TRUE(status == OK);
184 
185     ASSERT_TRUE(g_syncInterfaceB != nullptr);
186     ASSERT_TRUE(g_syncInterfaceC != nullptr);
187     SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
188     g_syncInterfaceB->SetSecurityOption(secOption);
189     g_syncInterfaceC->SetSecurityOption(secOption);
190 
191     /**
192      * @tc.steps: step2. deviceA call sync and wait
193      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
194      */
195     std::map<std::string, DBStatus> result;
196     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
197     ASSERT_TRUE(status == OK);
198 
199     ASSERT_TRUE(result.size() == devices.size());
200     for (const auto &pair : result) {
201         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
202         EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
203     }
204     VirtualDataItem item;
205     g_deviceB->GetData(key, item);
206     EXPECT_TRUE(item.value.empty());
207     g_deviceC->GetData(key, item);
208     EXPECT_TRUE(item.value.empty());
209 }
210 
211 /**
212  * @tc.name: sec option check Sync 002
213  * @tc.desc: if sec option not equal, forbid sync
214  * @tc.type: FUNC
215  * @tc.require: AR000EV1G6
216  * @tc.author: wangchuanqing
217  */
218 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck002, TestSize.Level1)
219 {
220     DBStatus status = OK;
221     std::vector<std::string> devices;
222     devices.push_back(g_deviceB->GetDeviceId());
223     devices.push_back(g_deviceC->GetDeviceId());
224 
225     /**
226      * @tc.steps: step1. deviceA put {k1, v1}
227      */
228     Key key = {'1'};
229     Value value = {'1'};
230     status = g_kvDelegatePtr->Put(key, value);
231     ASSERT_TRUE(status == OK);
232 
233     ASSERT_TRUE(g_syncInterfaceC != nullptr);
234     SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
235     g_syncInterfaceC->SetSecurityOption(secOption);
236     secOption.securityLabel = SecurityLabel::S3;
237     secOption.securityFlag = SecurityFlag::SECE;
238     g_syncInterfaceB->SetSecurityOption(secOption);
239 
240     /**
241      * @tc.steps: step2. deviceA call sync and wait
242      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
243      */
244     std::map<std::string, DBStatus> result;
245     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
246     ASSERT_TRUE(status == OK);
247 
248     ASSERT_TRUE(result.size() == devices.size());
249     for (const auto &pair : result) {
250         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
251         if (pair.first == DEVICE_B) {
252             EXPECT_TRUE(pair.second == OK);
253         } else {
254             EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
255         }
256     }
257     VirtualDataItem item;
258     g_deviceC->GetData(key, item);
259     EXPECT_TRUE(item.value.empty());
260     g_deviceB->GetData(key, item);
261     EXPECT_TRUE(item.value == value);
262 }
263 
264 #ifndef LOW_LEVEL_MEM_DEV
265 /**
266  * @tc.name: BigDataSync001
267  * @tc.desc: big data sync push mode.
268  * @tc.type: FUNC
269  * @tc.require: AR000F3OOU
270  * @tc.author: wangchuanqing
271  */
272 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync001, TestSize.Level1)
273 {
274     DBStatus status = OK;
275     std::vector<std::string> devices;
276     devices.push_back(g_deviceB->GetDeviceId());
277     devices.push_back(g_deviceC->GetDeviceId());
278 
279     /**
280      * @tc.steps: step1. deviceA put 16 bigData
281      */
282     std::vector<Entry> entries;
283     std::vector<Key> keys;
284     DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
285     for (const auto &entry : entries) {
286         status = g_kvDelegatePtr->Put(entry.key, entry.value);
287         ASSERT_TRUE(status == OK);
288     }
289 
290     /**
291      * @tc.steps: step2. deviceA call sync and wait
292      * @tc.expected: step2. sync should return OK.
293      */
294     std::map<std::string, DBStatus> result;
295     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
296     ASSERT_TRUE(status == OK);
297 
298     /**
299      * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
300      */
301     ASSERT_TRUE(result.size() == devices.size());
302     for (const auto &pair : result) {
303         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
304         EXPECT_TRUE(pair.second == OK);
305     }
306     VirtualDataItem item;
307     for (const auto &entry : entries) {
308         item.value.clear();
309         g_deviceB->GetData(entry.key, item);
310         EXPECT_TRUE(item.value == entry.value);
311         item.value.clear();
312         g_deviceC->GetData(entry.key, item);
313         EXPECT_TRUE(item.value == entry.value);
314     }
315 }
316 
317 /**
318  * @tc.name: BigDataSync002
319  * @tc.desc: big data sync pull mode.
320  * @tc.type: FUNC
321  * @tc.require: AR000F3OOU
322  * @tc.author: wangchuanqing
323  */
324 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync002, TestSize.Level1)
325 {
326     DBStatus status = OK;
327     std::vector<std::string> devices;
328     devices.push_back(g_deviceB->GetDeviceId());
329     devices.push_back(g_deviceC->GetDeviceId());
330 
331     /**
332      * @tc.steps: step1. deviceA deviceB put bigData
333      */
334     std::vector<Entry> entries;
335     std::vector<Key> keys;
336     DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
337 
338     for (uint32_t i = 0; i < entries.size(); i++) {
339         if (i % 2 == 0) {
340             g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
341         } else {
342             g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
343         }
344     }
345 
346     /**
347      * @tc.steps: step3. deviceA call pull sync
348      * @tc.expected: step3. sync should return OK.
349      */
350     std::map<std::string, DBStatus> result;
351     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
352     ASSERT_TRUE(status == OK);
353 
354     /**
355      * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
356      */
357     ASSERT_TRUE(result.size() == devices.size());
358     for (const auto &pair : result) {
359         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
360         EXPECT_TRUE(pair.second == OK);
361     }
362     for (const auto &entry : entries) {
363         Value value;
364         EXPECT_EQ(g_kvDelegatePtr->Get(entry.key, value), OK);
365         EXPECT_EQ(value, entry.value);
366     }
367 }
368 
369 /**
370  * @tc.name: BigDataSync003
371  * @tc.desc: big data sync pushAndPull mode.
372  * @tc.type: FUNC
373  * @tc.require: AR000F3OOV
374  * @tc.author: wangchuanqing
375  */
376 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync003, TestSize.Level1)
377 {
378     DBStatus status = OK;
379     std::vector<std::string> devices;
380     devices.push_back(g_deviceB->GetDeviceId());
381     devices.push_back(g_deviceC->GetDeviceId());
382 
383     /**
384      * @tc.steps: step1. deviceA deviceB put bigData
385      */
386     std::vector<Entry> entries;
387     std::vector<Key> keys;
388     DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
389 
390     for (uint32_t i = 0; i < entries.size(); i++) {
391         if (i % 3 == 0) { // 0 3 6 9 12 15 for deivec B
392             g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
393         } else if (i % 3 == 1) { // 1 4 7 10 13 16 for device C
394             g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
395         } else { // 2 5 8 11 14 for device A
396             status = g_kvDelegatePtr->Put(entries[i].key, entries[i].value);
397             ASSERT_TRUE(status == OK);
398         }
399     }
400 
401     /**
402      * @tc.steps: step3. deviceA call pushAndpull sync
403      * @tc.expected: step3. sync should return OK.
404      */
405     std::map<std::string, DBStatus> result;
406     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
407     ASSERT_TRUE(status == OK);
408 
409     /**
410      * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
411      * deviceB and deviceC has deviceA data
412      */
413     ASSERT_TRUE(result.size() == devices.size());
414     for (const auto &pair : result) {
415         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
416         EXPECT_TRUE(pair.second == OK);
417     }
418 
419     VirtualDataItem item;
420     for (uint32_t i = 0; i < entries.size(); i++) {
421         Value value;
422         EXPECT_EQ(g_kvDelegatePtr->Get(entries[i].key, value), OK);
423         EXPECT_EQ(value, entries[i].value);
424 
425         if (i % 3 == 2) { // 2 5 8 11 14 for device A
426         item.value.clear();
427         g_deviceB->GetData(entries[i].key, item);
428         EXPECT_TRUE(item.value == entries[i].value);
429         item.value.clear();
430         g_deviceC->GetData(entries[i].key, item);
431         EXPECT_TRUE(item.value == entries[i].value);
432         }
433     }
434 }
435 #endif
436 
437 /**
438  * @tc.name: PushFinishedNotify 001
439  * @tc.desc: Test remote device push finished notify function.
440  * @tc.type: FUNC
441  * @tc.require: AR000CQS3S
442  * @tc.author: xushaohua
443  */
444 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, PushFinishedNotify001, TestSize.Level1)
445 {
446     std::vector<std::string> devices;
447     devices.push_back(g_deviceB->GetDeviceId());
448 
449     /**
450      * @tc.steps: step1. deviceA call SetRemotePushFinishedNotify
451      * @tc.expected: step1. set should return OK.
452      */
453     int pushfinishedFlag = 0;
454     DBStatus status = g_kvDelegatePtr->SetRemotePushFinishedNotify(
__anon0f1c56880202(const RemotePushNotifyInfo &info) 455         [&pushfinishedFlag](const RemotePushNotifyInfo &info) {
456             EXPECT_TRUE(info.deviceId == DEVICE_B);
457             pushfinishedFlag = 1;
458     });
459     ASSERT_EQ(status, OK);
460 
461     /**
462      * @tc.steps: step2. deviceB put k2, v2, and deviceA pull from deviceB
463      * @tc.expected: step2. deviceA can not receive push finished notify
464      */
465     EXPECT_EQ(g_kvDelegatePtr->Put(KEY_2, VALUE_2), OK);
466     std::map<std::string, DBStatus> result;
467     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
468     EXPECT_TRUE(status == OK);
469     EXPECT_EQ(pushfinishedFlag, 0);
470     pushfinishedFlag = 0;
471 
472     /**
473      * @tc.steps: step3. deviceB put k3, v3, and deviceA push and pull to deviceB
474      * @tc.expected: step3. deviceA can not receive push finished notify
475      */
476     EXPECT_EQ(g_kvDelegatePtr->Put(KEY_3, VALUE_3), OK);
477     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
478     EXPECT_TRUE(status == OK);
479     EXPECT_EQ(pushfinishedFlag, 0);
480     pushfinishedFlag = 0;
481 
482     /**
483      * @tc.steps: step4. deviceA call SetRemotePushFinishedNotify to reset notify
484      * @tc.expected: step4. set should return OK.
485      */
__anon0f1c56880302(const RemotePushNotifyInfo &info) 486     status = g_kvDelegatePtr->SetRemotePushFinishedNotify([&pushfinishedFlag](const RemotePushNotifyInfo &info) {
487         EXPECT_TRUE(info.deviceId == DEVICE_B);
488         pushfinishedFlag = 2;
489     });
490     ASSERT_EQ(status, OK);
491 
492     /**
493      * @tc.steps: step5. deviceA call SetRemotePushFinishedNotify set null to unregist
494      * @tc.expected: step5. set should return OK.
495      */
496     status = g_kvDelegatePtr->SetRemotePushFinishedNotify(nullptr);
497     ASSERT_EQ(status, OK);
498 }
499 
500 namespace {
RegOnDispatchWithDelayAck(bool & errCodeAck,bool & afterErrAck)501 void RegOnDispatchWithDelayAck(bool &errCodeAck, bool &afterErrAck)
502 {
503     // just delay the busy ack
504     g_communicatorAggregator->RegOnDispatch([&errCodeAck, &afterErrAck](const std::string &dev, Message *inMsg) {
505         if (dev != g_deviceB->GetDeviceId()) {
506             return;
507         }
508         auto *packet = inMsg->GetObject<DataAckPacket>();
509         if (packet->GetRecvCode() == -E_BUSY) {
510             errCodeAck = true;
511             while (!afterErrAck) {
512             }
513             LOGW("NOW SEND BUSY ACK");
514         } else if (errCodeAck) {
515             afterErrAck = true;
516             std::this_thread::sleep_for(std::chrono::seconds(1));
517         }
518     });
519 }
520 
RegOnDispatchWithOffline(bool & offlineFlag,bool & invalid,condition_variable & conditionOffline)521 void RegOnDispatchWithOffline(bool &offlineFlag, bool &invalid, condition_variable &conditionOffline)
522 {
523     g_communicatorAggregator->RegOnDispatch([&offlineFlag, &invalid, &conditionOffline](
524                                                 const std::string &dev, Message *inMsg) {
525         auto *packet = inMsg->GetObject<DataAckPacket>();
526         if (dev != DEVICE_B) {
527             if (packet->GetRecvCode() == LOCAL_WATER_MARK_NOT_INIT) {
528                 offlineFlag = true;
529                 conditionOffline.notify_all();
530                 LOGW("[Dispatch] NOTIFY OFFLINE");
531                 std::this_thread::sleep_for(std::chrono::microseconds(EIGHT_HUNDRED));
532             }
533         } else if (!invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
534             LOGW("[Dispatch] NOW INVALID THIS MSG");
535             inMsg->SetMessageType(TYPE_INVALID);
536             inMsg->SetMessageId(INVALID_MESSAGE_ID);
537             invalid = true;
538         }
539     });
540 }
541 
RegOnDispatchWithInvalidMsg(bool & invalid)542 void RegOnDispatchWithInvalidMsg(bool &invalid)
543 {
544     g_communicatorAggregator->RegOnDispatch([&invalid](
545         const std::string &dev, Message *inMsg) {
546         if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
547             LOGW("[Dispatch] NOW INVALID THIS MSG");
548             inMsg->SetMessageType(TYPE_INVALID);
549             inMsg->SetMessageId(INVALID_MESSAGE_ID);
550             invalid = true;
551         }
552     });
553 }
554 
PrepareEnv(vector<std::string> & devices,Key & key,Query & query)555 void PrepareEnv(vector<std::string> &devices, Key &key, Query &query)
556 {
557     /**
558      * @tc.steps: step1. ensure the watermark is no zero and finish timeSync and abilitySync
559      * @tc.expected: step1. should return OK.
560      */
561     Value value = {'1'};
562     std::map<std::string, DBStatus> result;
563     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
564 
565     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
566     EXPECT_TRUE(status == OK);
567     ASSERT_TRUE(result[g_deviceB->GetDeviceId()] == OK);
568 }
569 
Sync(vector<std::string> & devices,const DBStatus & targetStatus)570 void Sync(vector<std::string> &devices, const DBStatus &targetStatus)
571 {
572     std::map<std::string, DBStatus> result;
573     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result);
574     EXPECT_TRUE(status == OK);
575     for (const auto &deviceId : devices) {
576         ASSERT_TRUE(result[deviceId] == targetStatus);
577     }
578 }
579 
SyncWithQuery(vector<std::string> & devices,const Query & query,const SyncMode & mode,const DBStatus & targetStatus)580 void SyncWithQuery(vector<std::string> &devices, const Query &query, const SyncMode &mode,
581     const DBStatus &targetStatus)
582 {
583     std::map<std::string, DBStatus> result;
584     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result, query);
585     EXPECT_TRUE(status == OK);
586     for (const auto &deviceId : devices) {
587         ASSERT_TRUE(result[deviceId] == targetStatus);
588     }
589 }
590 
SyncWithQuery(vector<std::string> & devices,const Query & query,const DBStatus & targetStatus)591 void SyncWithQuery(vector<std::string> &devices, const Query &query, const DBStatus &targetStatus)
592 {
593     SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PUSH_ONLY, targetStatus);
594 }
595 
SyncWithDeviceOffline(vector<std::string> & devices,Key & key,const Query & query)596 void SyncWithDeviceOffline(vector<std::string> &devices, Key &key, const Query &query)
597 {
598     Value value = {'2'};
599     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
600 
601     /**
602      * @tc.steps: step2. invalid the sync msg
603      * @tc.expected: step2. should return TIME_OUT.
604      */
605     SyncWithQuery(devices, query, TIME_OUT);
606 
607     /**
608      * @tc.steps: step3. device offline when sync
609      * @tc.expected: step3. should return COMM_FAILURE.
610      */
611     SyncWithQuery(devices, query, COMM_FAILURE);
612 }
613 
PrepareWaterMarkError(std::vector<std::string> & devices,Query & query)614 void PrepareWaterMarkError(std::vector<std::string> &devices, Query &query)
615 {
616     /**
617      * @tc.steps: step1. prepare data
618      */
619     devices.push_back(g_deviceB->GetDeviceId());
620     g_deviceB->Online();
621 
622     Key key = {'1'};
623     query = Query::Select().PrefixKey(key);
624     PrepareEnv(devices, key, query);
625 
626     /**
627      * @tc.steps: step2. query sync and set queryWaterMark
628      * @tc.expected: step2. should return OK.
629      */
630     Value value = {'2'};
631     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
632     SyncWithQuery(devices, query, OK);
633 
634     /**
635      * @tc.steps: step3. sync and invalid msg for set local device waterMark
636      * @tc.expected: step3. should return TIME_OUT.
637      */
638     bool invalidMsg = false;
639     RegOnDispatchWithInvalidMsg(invalidMsg);
640     value = {'3'};
641     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
642     Sync(devices, TIME_OUT);
643     g_communicatorAggregator->RegOnDispatch(nullptr);
644 }
645 }
646 
647 /**
648  * @tc.name: AckSessionCheck 001
649  * @tc.desc: Test ack session check function.
650  * @tc.type: FUNC
651  * @tc.require: AR000F3OOV
652  * @tc.author: zhangqiquan
653  */
654 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSessionCheck001, TestSize.Level3)
655 {
656     std::vector<std::string> devices;
657     devices.push_back(g_deviceB->GetDeviceId());
658 
659     /**
660      * @tc.steps: step1. deviceB sync to deviceA just for timeSync and abilitySync
661      * @tc.expected: step1. should return OK.
662      */
663     ASSERT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
664 
665     /**
666      * @tc.steps: step2. deviceA StartTransaction for prevent other sync action deviceB sync will fail
667      * @tc.expected: step2. should return OK.
668      */
669     ASSERT_TRUE(g_kvDelegatePtr->StartTransaction() == OK);
670 
671     bool errCodeAck = false;
672     bool afterErrAck = false;
673     RegOnDispatchWithDelayAck(errCodeAck, afterErrAck);
674 
675     Key key = {'1'};
676     Value value = {'1'};
677     Timestamp currentTime;
678     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
679     EXPECT_TRUE(g_deviceB->PutData(key, value, currentTime, 0) == OK);
680     EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
681 
682     Value outValue;
683     EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == NOT_FOUND);
684 
685     /**
686      * @tc.steps: step3. release the writeHandle and try again, sync success
687      * @tc.expected: step3. should return OK.
688      */
689     EXPECT_TRUE(g_kvDelegatePtr->Commit() == OK);
690     EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
691 
692     EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == E_OK);
693     EXPECT_EQ(outValue, value);
694 }
695 
696 /**
697  * @tc.name: AckSafeCheck001
698  * @tc.desc: Test ack session check filter all bad ack in device offline scene.
699  * @tc.type: FUNC
700  * @tc.require: AR000F3OOV
701  * @tc.author: zhangqiquan
702  */
703 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSafeCheck001, TestSize.Level3)
704 {
705     std::vector<std::string> devices;
706     devices.push_back(g_deviceB->GetDeviceId());
707     g_deviceB->Online();
708 
709     Key key = {'1'};
710     Query query = Query::Select().PrefixKey(key);
711     PrepareEnv(devices, key, query);
712 
713     std::condition_variable conditionOnline;
714     std::condition_variable conditionOffline;
715     bool onlineFlag = false;
716     bool invalid = false;
717     bool offlineFlag = false;
__anon0f1c56880802() 718     thread subThread([&onlineFlag, &conditionOnline, &offlineFlag, &conditionOffline]() {
719         LOGW("[Dispatch] NOW DEVICES IS OFFLINE");
720         std::mutex offlineMtx;
721         std::unique_lock<std::mutex> lck(offlineMtx);
722         conditionOffline.wait(lck, [&offlineFlag]{ return offlineFlag; });
723         g_deviceB->Offline();
724         std::this_thread::sleep_for(std::chrono::milliseconds(100));
725         g_deviceB->Online();
726         onlineFlag = true;
727         conditionOnline.notify_all();
728         LOGW("[Dispatch] NOW DEVICES IS ONLINE");
729     });
730     subThread.detach();
731 
732     RegOnDispatchWithOffline(offlineFlag, invalid, conditionOffline);
733 
734     SyncWithDeviceOffline(devices, key, query);
735 
736     std::mutex onlineMtx;
737     std::unique_lock<std::mutex> lck(onlineMtx);
__anon0f1c56880a02null738     conditionOnline.wait(lck, [&onlineFlag]{ return onlineFlag; });
739 
740     /**
741      * @tc.steps: step4. sync again if has problem it will sync never end
742      * @tc.expected: step4. should return OK.
743      */
744     SyncWithQuery(devices, query, OK);
745 }
746 
747 /**
748  * @tc.name: WaterMarkCheck001
749  * @tc.desc: Test waterMark work correct in lost package scene.
750  * @tc.type: FUNC
751  * @tc.require: AR000F3OOV
752  * @tc.author: zhangqiquan
753  */
754 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck001, TestSize.Level1)
755 {
756     std::vector<std::string> devices;
757     Query query = Query::Select();
758     PrepareWaterMarkError(devices, query);
759 
760     /**
761      * @tc.steps: step4. sync again see it work correct
762      * @tc.expected: step4. should return OK.
763      */
764     SyncWithQuery(devices, query, OK);
765 }
766 
767 /**
768  * @tc.name: WaterMarkCheck002
769  * @tc.desc: Test pull work correct in error waterMark scene.
770  * @tc.type: FUNC
771  * @tc.require: AR000F3OOV
772  * @tc.author: zhangqiquan
773  */
774 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck002, TestSize.Level1)
775 {
776     std::vector<std::string> devices;
777     Query query = Query::Select();
778     PrepareWaterMarkError(devices, query);
779 
780     /**
781      * @tc.steps: step4. sync again see it work correct
782      * @tc.expected: step4. should return OK.
783      */
784     Key key = {'2'};
785     ASSERT_TRUE(g_kvDelegatePtr->Put(key, {}) == OK);
786     query = Query::Select();
787     SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PULL_ONLY, OK);
788 
789     VirtualDataItem item;
790     EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
791 }
792 
RegOnDispatchToGetSyncCount(int & sendRequestCount,int sleepMs=0)793 void RegOnDispatchToGetSyncCount(int &sendRequestCount, int sleepMs = 0)
794 {
795     g_communicatorAggregator->RegOnDispatch([sleepMs, &sendRequestCount](
796             const std::string &dev, Message *inMsg) {
797         if (dev == DEVICE_B && inMsg->GetMessageType() == TYPE_REQUEST) {
798             std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
799             sendRequestCount++;
800             LOGD("sendRequestCount++...");
801         }
802     });
803 }
804 
TestDifferentSyncMode(SyncMode mode)805 void TestDifferentSyncMode(SyncMode mode)
806 {
807     std::vector<std::string> devices;
808     devices.push_back(g_deviceB->GetDeviceId());
809 
810     /**
811      * @tc.steps: step1. deviceA put {k1, v1}
812      */
813     Key key = {'1'};
814     Value value = {'1'};
815     DBStatus status = g_kvDelegatePtr->Put(key, value);
816     ASSERT_TRUE(status == OK);
817 
818     int sendRequestCount = 0;
819     RegOnDispatchToGetSyncCount(sendRequestCount);
820 
821     /**
822      * @tc.steps: step2. deviceA call sync and wait
823      * @tc.expected: step2. sync should return OK.
824      */
825     std::map<std::string, DBStatus> result;
826     status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result);
827     ASSERT_TRUE(status == OK);
828 
829     /**
830      * @tc.expected: step2. onComplete should be called, DeviceB have {k1,v1}, send request message 3 times
831      */
832     ASSERT_TRUE(result.size() == devices.size());
833     for (const auto &pair : result) {
834         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
835         EXPECT_TRUE(pair.second == OK);
836     }
837     VirtualDataItem item;
838     g_deviceB->GetData(key, item);
839     EXPECT_TRUE(item.value == value);
840 
841     EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
842 
843     /**
844      * @tc.steps: step3. reset sendRequestCount to 0, deviceA call sync and wait again without any change in db
845      * @tc.expected: step3. sync should return OK, and sendRequestCount should be 1, because this merge can not
846      * be skipped
847      */
848     sendRequestCount = 0;
849     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
850     ASSERT_TRUE(status == OK);
851     EXPECT_EQ(sendRequestCount, 1);
852 }
853 
854 /**
855  * @tc.name: PushSyncMergeCheck001
856  * @tc.desc: Test push sync task merge, task can not be merged when the two sync task is not in the queue
857  * at the same time.
858  * @tc.type: FUNC
859  * @tc.require: AR000F3OOV
860  * @tc.author: zhangshijie
861  */
862 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck001, TestSize.Level1)
863 {
864     TestDifferentSyncMode(SYNC_MODE_PUSH_ONLY);
865 }
866 
867 /**
868  * @tc.name: PushSyncMergeCheck002
869  * @tc.desc: Test push_pull sync task merge, task can not be merged when the two sync task is not in the queue
870  * at the same time.
871  * @tc.type: FUNC
872  * @tc.require: AR000F3OOV
873  * @tc.author: zhangshijie
874  */
875 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck002, TestSize.Level1)
876 {
877     TestDifferentSyncMode(SYNC_MODE_PUSH_PULL);
878 }
879 
PrepareForSyncMergeTest(std::vector<std::string> & devices,int & sendRequestCount)880 void PrepareForSyncMergeTest(std::vector<std::string> &devices, int &sendRequestCount)
881 {
882     /**
883      * @tc.steps: step1. deviceA put {k1, v1}
884      */
885     Key key = {'1'};
886     Value value = {'1'};
887     DBStatus status = g_kvDelegatePtr->Put(key, value);
888     ASSERT_TRUE(status == OK);
889 
890     RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
891 
892     /**
893      * @tc.steps: step2. deviceA call sync and don't wait
894      * @tc.expected: step2. sync should return OK.
895      */
896     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
897         [&sendRequestCount, devices, key, value](const std::map<std::string, DBStatus>& statusMap) {
898         ASSERT_TRUE(statusMap.size() == devices.size());
899         for (const auto &pair : statusMap) {
900             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
901             EXPECT_TRUE(pair.second == OK);
902         }
903         VirtualDataItem item;
904         g_deviceB->GetData(key, item);
905         EXPECT_EQ(item.value, value);
906         EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
907 
908         // reset sendRequestCount to 0
909         sendRequestCount = 0;
910     });
911     ASSERT_TRUE(status == OK);
912 }
913 
914 /**
915  * @tc.name: PushSyncMergeCheck003
916  * @tc.desc: Test push sync task merge, task can not be merged when there is change in db since last push sync
917  * @tc.type: FUNC
918  * @tc.require: AR000F3OOV
919  * @tc.author: zhangshijie
920  */
921 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck003, TestSize.Level3)
922 {
923     DBStatus status = OK;
924     std::vector<std::string> devices;
925     devices.push_back(g_deviceB->GetDeviceId());
926 
927     int sendRequestCount = 0;
928     PrepareForSyncMergeTest(devices, sendRequestCount);
929 
930     /**
931      * @tc.steps: step3. deviceA call sync and don't wait
932      * @tc.expected: step3. sync should return OK.
933      */
934     Key key = {'1'};
935     Value value = {'2'};
936     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56880d02(const std::map<std::string, DBStatus>& statusMap) 937         [&sendRequestCount, devices, key, value, this](const std::map<std::string, DBStatus>& statusMap) {
938         /**
939          * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
940          * skipped, but it is no need to do time sync and ability sync, only need to do data sync
941          */
942         ASSERT_TRUE(statusMap.size() == devices.size());
943         for (const auto &pair : statusMap) {
944             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
945             EXPECT_TRUE(pair.second == OK);
946         }
947         VirtualDataItem item;
948         g_deviceB->GetData(key, item);
949         EXPECT_EQ(item.value, value);
950     });
951     ASSERT_TRUE(status == OK);
952 
953     /**
954      * @tc.steps: step4. deviceA put {k1, v2}
955      */
956     while (sendRequestCount < TWO_CNT) {
957         std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
958     }
959     status = g_kvDelegatePtr->Put(key, value);
960     ASSERT_TRUE(status == OK);
961     // wait for the second sync task finish
962     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
963     EXPECT_EQ(sendRequestCount, 1);
964 }
965 
966 /**
967  * @tc.name: PushSyncMergeCheck004
968  * @tc.desc: Test push sync task merge, task can be merged when there is no change in db since last push sync
969  * @tc.type: FUNC
970  * @tc.require: AR000F3OOV
971  * @tc.author: zhangshijie
972  */
973 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck004, TestSize.Level3)
974 {
975     DBStatus status = OK;
976     std::vector<std::string> devices;
977     devices.push_back(g_deviceB->GetDeviceId());
978 
979     int sendRequestCount = 0;
980     PrepareForSyncMergeTest(devices, sendRequestCount);
981 
982     /**
983      * @tc.steps: step3. deviceA call sync and don't wait
984      * @tc.expected: step3. sync should return OK.
985      */
986     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56880e02(const std::map<std::string, DBStatus>& statusMap) 987         [devices, this](const std::map<std::string, DBStatus>& statusMap) {
988         /**
989          * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can  be
990          * skipped
991          */
992         ASSERT_TRUE(statusMap.size() == devices.size());
993         for (const auto &pair : statusMap) {
994             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
995             EXPECT_TRUE(pair.second == OK);
996         }
997     });
998     ASSERT_TRUE(status == OK);
999     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1000     EXPECT_EQ(sendRequestCount, 0);
1001 }
1002 
RegOnDispatchWithInvalidMsgAndCnt(int & sendRequestCount,int sleepMs,bool & invalid)1003 void RegOnDispatchWithInvalidMsgAndCnt(int &sendRequestCount, int sleepMs, bool &invalid)
1004 {
1005     g_communicatorAggregator->RegOnDispatch([&sendRequestCount, sleepMs, &invalid](
1006         const std::string &dev, Message *inMsg) {
1007         if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
1008             inMsg->SetMessageType(TYPE_INVALID);
1009             inMsg->SetMessageId(INVALID_MESSAGE_ID);
1010             sendRequestCount++;
1011             invalid = true;
1012             LOGW("[Dispatch]invalid THIS MSG, sendRequestCount = %d", sendRequestCount);
1013             std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1014         }
1015     });
1016 }
1017 
1018 /**
1019  * @tc.name: PushSyncMergeCheck005
1020  * @tc.desc: Test push sync task merge, task cannot be merged when the last push sync is failed
1021  * @tc.type: FUNC
1022  * @tc.require: AR000F3OOV
1023  * @tc.author: zhangshijie
1024  */
1025 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck005, TestSize.Level3)
1026 {
1027     DBStatus status = OK;
1028     std::vector<std::string> devices;
1029     devices.push_back(g_deviceB->GetDeviceId());
1030 
1031     /**
1032      * @tc.steps: step1. deviceA put {k1, v1}
1033      */
1034     Key key = {'1'};
1035     Value value = {'1'};
1036     status = g_kvDelegatePtr->Put(key, value);
1037     ASSERT_TRUE(status == OK);
1038 
1039     int sendRequestCount = 0;
1040     bool invalid = false;
1041     RegOnDispatchWithInvalidMsgAndCnt(sendRequestCount, SLEEP_MILLISECONDS, invalid);
1042 
1043     /**
1044      * @tc.steps: step2. deviceA call sync and don't wait
1045      * @tc.expected: step2. sync should return TIME_OUT.
1046      */
1047     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56881002(const std::map<std::string, DBStatus>& statusMap) 1048         [&sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1049         ASSERT_TRUE(statusMap.size() == devices.size());
1050         for (const auto &deviceId : devices) {
1051             ASSERT_EQ(statusMap.at(deviceId), TIME_OUT);
1052         }
1053     });
1054     EXPECT_TRUE(status == OK);
1055 
1056     /**
1057      * @tc.steps: step3. deviceA call sync and don't wait
1058      * @tc.expected: step3. sync should return OK.
1059      */
1060     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56881102(const std::map<std::string, DBStatus>& statusMap) 1061         [key, value, &sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1062         /**
1063          * @tc.expected: when the second sync task return, sendRequestCount should be 3, because this merge can not be
1064          * skipped, deviceB should have {k1, v1}.
1065          */
1066         ASSERT_TRUE(statusMap.size() == devices.size());
1067         for (const auto &pair : statusMap) {
1068             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1069             EXPECT_EQ(pair.second, OK);
1070         }
1071         VirtualDataItem item;
1072         g_deviceB->GetData(key, item);
1073         EXPECT_EQ(item.value, value);
1074     });
1075     ASSERT_TRUE(status == OK);
1076     while (sendRequestCount < 1) {
1077         std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1078     }
1079     sendRequestCount = 0;
1080     RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1081 
1082     // wait for the second sync task finish
1083     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1084     EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1085 }
1086 
PrePareForQuerySyncMergeTest(bool isQuerySync,std::vector<std::string> & devices,Key & key,Value & value,int & sendRequestCount)1087 void PrePareForQuerySyncMergeTest(bool isQuerySync, std::vector<std::string> &devices,
1088     Key &key, Value &value, int &sendRequestCount)
1089 {
1090     DBStatus status = OK;
1091     /**
1092      * @tc.steps: step1. deviceA put {k1, v1}...{k10, v10}
1093      */
1094     Query query = Query::Select().PrefixKey(key);
1095     const int dataSize = 10;
1096     for (int i = 0; i < dataSize; i++) {
1097         key.push_back(i);
1098         value.push_back(i);
1099         status = g_kvDelegatePtr->Put(key, value);
1100         ASSERT_TRUE(status == OK);
1101         key.pop_back();
1102         value.pop_back();
1103     }
1104 
1105     RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1106     /**
1107      * @tc.steps: step2. deviceA call query sync and don't wait
1108      * @tc.expected: step2. sync should return OK.
1109      */
1110     auto completeCallBack = [&sendRequestCount, &key, &value, dataSize, devices]
1111         (const std::map<std::string, DBStatus>& statusMap) {
1112         ASSERT_TRUE(statusMap.size() == devices.size());
1113         for (const auto &pair : statusMap) {
1114             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1115             EXPECT_EQ(pair.second, OK);
1116         }
1117         // when first sync finish, DeviceB have {k1,v1}, {k3,v3}, {k5,v5} .. send request message 3 times
1118         VirtualDataItem item;
1119         for (int i = 0; i < dataSize; i++) {
1120             key.push_back(i);
1121             value.push_back(i);
1122             g_deviceB->GetData(key, item);
1123             EXPECT_EQ(item.value, value);
1124             key.pop_back();
1125             value.pop_back();
1126         }
1127         EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1128         // reset sendRequestCount to 0
1129         sendRequestCount = 0;
1130     };
1131     if (isQuerySync) {
1132         status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack, query, false);
1133     } else {
1134         status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack);
1135     }
1136     ASSERT_TRUE(status == OK);
1137 }
1138 
1139 /**
1140  * @tc.name: QuerySyncMergeCheck001
1141  * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last query sync
1142  * @tc.type: FUNC
1143  * @tc.require: AR000F3OOV
1144  * @tc.author: zhangshijie
1145  */
1146 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck001, TestSize.Level3)
1147 {
1148     std::vector<std::string> devices;
1149     int sendRequestCount = 0;
1150     devices.push_back(g_deviceB->GetDeviceId());
1151 
1152     Key key {'1'};
1153     Value value {'1'};
1154     Query query = Query::Select().PrefixKey(key);
1155     PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1156 
1157     /**
1158      * @tc.steps: step3. deviceA call query sync and don't wait
1159      * @tc.expected: step3. sync should return OK.
1160      */
1161     DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56881302(const std::map<std::string, DBStatus>& statusMap) 1162         [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1163         /**
1164          * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can be
1165          * skipped because there is no change in db since last query sync
1166          */
1167         ASSERT_TRUE(statusMap.size() == devices.size());
1168         for (const auto &pair : statusMap) {
1169             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1170             EXPECT_TRUE(pair.second == OK);
1171         }
1172     }, query, false);
1173     ASSERT_TRUE(status == OK);
1174     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1175     EXPECT_EQ(sendRequestCount, 0);
1176 }
1177 
1178 /**
1179  * @tc.name: QuerySyncMergeCheck002
1180  * @tc.desc: Test query push sync task merge, task can not be merged when there is change in db since last sync
1181  * @tc.type: FUNC
1182  * @tc.require: AR000F3OOV
1183  * @tc.author: zhangshijie
1184  */
1185 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck002, TestSize.Level3)
1186 {
1187     std::vector<std::string> devices;
1188     int sendRequestCount = 0;
1189     devices.push_back(g_deviceB->GetDeviceId());
1190 
1191     Key key {'1'};
1192     Value value {'1'};
1193     Query query = Query::Select().PrefixKey(key);
1194     PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1195 
1196     /**
1197      * @tc.steps: step3. deviceA call query sync and don't wait
1198      * @tc.expected: step3. sync should return OK.
1199      */
1200     Value value3{'3'};
1201     DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56881402(const std::map<std::string, DBStatus>& statusMap) 1202         [&sendRequestCount, devices, key, value3, this](const std::map<std::string, DBStatus>& statusMap) {
1203         /**
1204          * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1205          * skipped when there is change in db since last query sync, deviceB have {k1, v1'}
1206          */
1207         ASSERT_TRUE(statusMap.size() == devices.size());
1208         for (const auto &pair : statusMap) {
1209             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1210             EXPECT_TRUE(pair.second == OK);
1211         }
1212         VirtualDataItem item;
1213         g_deviceB->GetData(key, item);
1214         EXPECT_TRUE(item.value == value3);
1215         EXPECT_EQ(sendRequestCount, 1);
1216         }, query, false);
1217     ASSERT_TRUE(status == OK);
1218 
1219     /**
1220      * @tc.steps: step4. deviceA put {k1, v1'}
1221      * @tc.steps: step4. reset sendRequestCount to 0, deviceA call sync and wait
1222      * @tc.expected: step4. sync should return OK, and sendRequestCount should be 1, because this merge can not
1223      * be skipped
1224      */
1225     while (sendRequestCount < TWO_CNT) {
1226         std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1227     }
1228     g_kvDelegatePtr->Put(key, value3);
1229     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1230 }
1231 
1232 /**
1233  * @tc.name: QuerySyncMergeCheck003
1234  * @tc.desc: Test query push sync task merge, task can not be merged when then query id is different
1235  * @tc.type: FUNC
1236  * @tc.require: AR000F3OOV
1237  * @tc.author: zhangshijie
1238  */
1239 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck003, TestSize.Level3)
1240 {
1241     std::vector<std::string> devices;
1242     int sendRequestCount = 0;
1243     devices.push_back(g_deviceB->GetDeviceId());
1244 
1245     Key key {'1'};
1246     Value value {'1'};
1247     PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1248 
1249     /**
1250      * @tc.steps: step3.  deviceA call another query sync
1251      * @tc.expected: step3. sync should return OK.
1252      */
1253     Key key2 = {'2'};
1254     Value value2 = {'2'};
1255     DBStatus status = g_kvDelegatePtr->Put(key2, value2);
1256     ASSERT_TRUE(status == OK);
1257     Query query2 = Query::Select().PrefixKey(key2);
1258     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56881502(const std::map<std::string, DBStatus>& statusMap) 1259         [&sendRequestCount, key2, value2, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1260         /**
1261          * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1262          * skipped, deviceB have {k2,v2}
1263          */
1264         ASSERT_TRUE(statusMap.size() == devices.size());
1265         for (const auto &pair : statusMap) {
1266             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1267             EXPECT_TRUE(pair.second == OK);
1268         }
1269         VirtualDataItem item;
1270         g_deviceB->GetData(key2, item);
1271         EXPECT_TRUE(item.value == value2);
1272         EXPECT_EQ(sendRequestCount, 1);
1273         }, query2, false);
1274     ASSERT_TRUE(status == OK);
1275     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1276 }
1277 
1278 /**
1279 * @tc.name: QuerySyncMergeCheck004
1280 * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last push sync
1281 * @tc.type: FUNC
1282 * @tc.require: AR000F3OOV
1283 * @tc.author: zhangshijie
1284 */
1285 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck004, TestSize.Level3)
1286 {
1287     DBStatus status = OK;
1288     std::vector<std::string> devices;
1289     devices.push_back(g_deviceB->GetDeviceId());
1290 
1291     Key key {'1'};
1292     Value value {'1'};
1293     int sendRequestCount = 0;
1294     PrePareForQuerySyncMergeTest(false, devices, key, value, sendRequestCount);
1295 
1296     /**
1297      * @tc.steps: step3. deviceA call query sync without any change in db
1298      * @tc.expected: step3. sync should return OK, and sendRequestCount should be 0, because this merge can be skipped
1299      */
1300     Query query = Query::Select().PrefixKey(key);
1301     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56881602(const std::map<std::string, DBStatus>& statusMap) 1302         [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1303             /**
1304              * @tc.expected step3: when the second sync task return, sendRequestCount should be 0, because this merge
1305              * can be skipped because there is no change in db since last push sync
1306              */
1307             ASSERT_TRUE(statusMap.size() == devices.size());
1308             for (const auto &pair : statusMap) {
1309                 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1310                 EXPECT_TRUE(pair.second == OK);
1311             }
1312         }, query, false);
1313     ASSERT_TRUE(status == OK);
1314     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1315     EXPECT_EQ(sendRequestCount, 0);
1316 }
1317 
1318 /**
1319   * @tc.name: GetDataNotify001
1320   * @tc.desc: Test GetDataNotify function, delay < 30s should sync ok, > 36 should timeout
1321   * @tc.type: FUNC
1322   * @tc.require: AR000D4876
1323   * @tc.author: zhangqiquan
1324   */
1325 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify001, TestSize.Level3)
1326 {
1327     ASSERT_NE(g_kvDelegatePtr, nullptr);
1328     DBStatus status = OK;
1329     std::vector<std::string> devices;
1330     devices.push_back(g_deviceB->GetDeviceId());
1331     const std::string DEVICE_A = "real_device";
1332     /**
1333      * @tc.steps: step1. deviceB set get data delay 40s
1334      */
1335     g_deviceB->DelayGetSyncData(WAIT_40_SECONDS);
1336     g_communicatorAggregator->SetTimeout(DEVICE_A, TIMEOUT_6_SECONDS);
1337 
1338     /**
1339      * @tc.steps: step2. deviceA call sync and wait
1340      * @tc.expected: step2. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
1341      */
1342     std::map<std::string, DBStatus> result;
1343     std::map<std::string, int> virtualRes;
1344     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1345     EXPECT_EQ(status, OK);
1346     EXPECT_EQ(result.size(), devices.size());
1347     EXPECT_TRUE(result[DEVICE_B] == TIME_OUT || result[DEVICE_B] == OK);
1348     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1349     Query query = Query::Select();
__anon0f1c56881702(std::map<std::string, int> resMap) 1350     g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1351         virtualRes = std::move(resMap);
1352     }, true);
1353     EXPECT_EQ(virtualRes.size(), devices.size());
1354     EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_TIMEOUT));
1355     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1356 
1357     /**
1358      * @tc.steps: step3. deviceB set get data delay 30s
1359      */
1360     g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1361     /**
1362      * @tc.steps: step4. deviceA call sync and wait
1363      * @tc.expected: step4. sync should return OK. onComplete should be called, deviceB sync OK.
1364      */
1365     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1366     EXPECT_EQ(status, OK);
1367     EXPECT_EQ(result.size(), devices.size());
1368     EXPECT_EQ(result[DEVICE_B], OK);
1369     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
__anon0f1c56881802(std::map<std::string, int> resMap) 1370     g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1371         virtualRes = std::move(resMap);
1372     }, true);
1373     EXPECT_EQ(virtualRes.size(), devices.size());
1374     EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_FINISHED_ALL));
1375     g_deviceB->DelayGetSyncData(0);
1376 }