1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17
18 #include "distributeddb_data_generate_unit_test.h"
19 #include "distributeddb_tools_unit_test.h"
20 #include "kv_virtual_device.h"
21 #include "platform_specific.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "single_ver_data_packet.h"
24 #include "virtual_communicator_aggregator.h"
25
26 using namespace testing::ext;
27 using namespace DistributedDB;
28 using namespace DistributedDBUnitTest;
29 using namespace std;
30
31 namespace {
32 string g_testDir;
33 const string STORE_ID = "kv_stroe_sync_check_test";
34 const std::string DEVICE_B = "deviceB";
35 const std::string DEVICE_C = "deviceC";
36 const int LOCAL_WATER_MARK_NOT_INIT = 0xaa;
37 const int EIGHT_HUNDRED = 800;
38 const int NORMAL_SYNC_SEND_REQUEST_CNT = 3;
39 const int TWO_CNT = 2;
40 const int SLEEP_MILLISECONDS = 500;
41 const int TEN_SECONDS = 10;
42 const int THREE_HUNDRED = 300;
43 const int WAIT_30_SECONDS = 30000;
44 const int WAIT_40_SECONDS = 40000;
45 const int TIMEOUT_6_SECONDS = 6000;
46
47 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
48 KvStoreConfig g_config;
49 DistributedDBToolsUnitTest g_tool;
50 DBStatus g_kvDelegateStatus = INVALID_ARGS;
51 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
52 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
53 KvVirtualDevice* g_deviceB = nullptr;
54 KvVirtualDevice* g_deviceC = nullptr;
55 VirtualSingleVerSyncDBInterface *g_syncInterfaceB = nullptr;
56 VirtualSingleVerSyncDBInterface *g_syncInterfaceC = nullptr;
57
58 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
59 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
60 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
61 #ifndef LOW_LEVEL_MEM_DEV
62 const int KEY_LEN = 20; // 20 Bytes
63 const int VALUE_LEN = 4 * 1024 * 1024; // 4MB
64 const int ENTRY_NUM = 2; // 16 entries
65 #endif
66 }
67
68 class DistributedDBSingleVerP2PSyncCheckTest : public testing::Test {
69 public:
70 static void SetUpTestCase(void);
71 static void TearDownTestCase(void);
72 void SetUp();
73 void TearDown();
74 };
75
SetUpTestCase(void)76 void DistributedDBSingleVerP2PSyncCheckTest::SetUpTestCase(void)
77 {
78 /**
79 * @tc.setup: Init datadir and Virtual Communicator.
80 */
81 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
82 g_config.dataDir = g_testDir;
83 g_mgr.SetKvStoreConfig(g_config);
84
85 string dir = g_testDir + "/single_ver";
86 DIR* dirTmp = opendir(dir.c_str());
87 if (dirTmp == nullptr) {
88 OS::MakeDBDirectory(dir);
89 } else {
90 closedir(dirTmp);
91 }
92
93 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
94 ASSERT_TRUE(g_communicatorAggregator != nullptr);
95 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
96
97 std::shared_ptr<ProcessSystemApiAdapterImpl> g_adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
98 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(g_adapter);
99 }
100
TearDownTestCase(void)101 void DistributedDBSingleVerP2PSyncCheckTest::TearDownTestCase(void)
102 {
103 /**
104 * @tc.teardown: Release virtual Communicator and clear data dir.
105 */
106 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
107 LOGE("rm test db files error!");
108 }
109 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
110 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
111 }
112
SetUp(void)113 void DistributedDBSingleVerP2PSyncCheckTest::SetUp(void)
114 {
115 DistributedDBToolsUnitTest::PrintTestCaseInfo();
116 /**
117 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
118 */
119 KvStoreNbDelegate::Option option;
120 option.secOption.securityLabel = SecurityLabel::S3;
121 option.secOption.securityFlag = SecurityFlag::SECE;
122 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
123 ASSERT_TRUE(g_kvDelegateStatus == OK);
124 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
125 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
126 ASSERT_TRUE(g_deviceB != nullptr);
127 g_syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
128 ASSERT_TRUE(g_syncInterfaceB != nullptr);
129 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, g_syncInterfaceB), E_OK);
130
131 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
132 ASSERT_TRUE(g_deviceC != nullptr);
133 g_syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
134 ASSERT_TRUE(g_syncInterfaceC != nullptr);
135 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, g_syncInterfaceC), E_OK);
136 }
137
TearDown(void)138 void DistributedDBSingleVerP2PSyncCheckTest::TearDown(void)
139 {
140 /**
141 * @tc.teardown: Release device A, B, C
142 */
143 if (g_kvDelegatePtr != nullptr) {
144 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
145 g_kvDelegatePtr = nullptr;
146 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
147 LOGD("delete kv store status %d", status);
148 ASSERT_TRUE(status == OK);
149 }
150 if (g_deviceB != nullptr) {
151 delete g_deviceB;
152 g_deviceB = nullptr;
153 }
154 if (g_deviceC != nullptr) {
155 delete g_deviceC;
156 g_deviceC = nullptr;
157 }
158 if (g_communicatorAggregator != nullptr) {
159 g_communicatorAggregator->RegOnDispatch(nullptr);
160 }
161 }
162
163 /**
164 * @tc.name: sec option check Sync 001
165 * @tc.desc: if sec option not equal, forbid sync
166 * @tc.type: FUNC
167 * @tc.require: AR000EV1G6
168 * @tc.author: wangchuanqing
169 */
170 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck001, TestSize.Level1)
171 {
172 DBStatus status = OK;
173 std::vector<std::string> devices;
174 devices.push_back(g_deviceB->GetDeviceId());
175 devices.push_back(g_deviceC->GetDeviceId());
176
177 /**
178 * @tc.steps: step1. deviceA put {k1, v1}
179 */
180 Key key = {'1'};
181 Value value = {'1'};
182 status = g_kvDelegatePtr->Put(key, value);
183 ASSERT_TRUE(status == OK);
184
185 ASSERT_TRUE(g_syncInterfaceB != nullptr);
186 ASSERT_TRUE(g_syncInterfaceC != nullptr);
187 SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
188 g_syncInterfaceB->SetSecurityOption(secOption);
189 g_syncInterfaceC->SetSecurityOption(secOption);
190
191 /**
192 * @tc.steps: step2. deviceA call sync and wait
193 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
194 */
195 std::map<std::string, DBStatus> result;
196 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
197 ASSERT_TRUE(status == OK);
198
199 ASSERT_TRUE(result.size() == devices.size());
200 for (const auto &pair : result) {
201 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
202 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
203 }
204 VirtualDataItem item;
205 g_deviceB->GetData(key, item);
206 EXPECT_TRUE(item.value.empty());
207 g_deviceC->GetData(key, item);
208 EXPECT_TRUE(item.value.empty());
209 }
210
211 /**
212 * @tc.name: sec option check Sync 002
213 * @tc.desc: if sec option not equal, forbid sync
214 * @tc.type: FUNC
215 * @tc.require: AR000EV1G6
216 * @tc.author: wangchuanqing
217 */
218 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck002, TestSize.Level1)
219 {
220 DBStatus status = OK;
221 std::vector<std::string> devices;
222 devices.push_back(g_deviceB->GetDeviceId());
223 devices.push_back(g_deviceC->GetDeviceId());
224
225 /**
226 * @tc.steps: step1. deviceA put {k1, v1}
227 */
228 Key key = {'1'};
229 Value value = {'1'};
230 status = g_kvDelegatePtr->Put(key, value);
231 ASSERT_TRUE(status == OK);
232
233 ASSERT_TRUE(g_syncInterfaceC != nullptr);
234 SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
235 g_syncInterfaceC->SetSecurityOption(secOption);
236 secOption.securityLabel = SecurityLabel::S3;
237 secOption.securityFlag = SecurityFlag::SECE;
238 g_syncInterfaceB->SetSecurityOption(secOption);
239
240 /**
241 * @tc.steps: step2. deviceA call sync and wait
242 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
243 */
244 std::map<std::string, DBStatus> result;
245 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
246 ASSERT_TRUE(status == OK);
247
248 ASSERT_TRUE(result.size() == devices.size());
249 for (const auto &pair : result) {
250 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
251 if (pair.first == DEVICE_B) {
252 EXPECT_TRUE(pair.second == OK);
253 } else {
254 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
255 }
256 }
257 VirtualDataItem item;
258 g_deviceC->GetData(key, item);
259 EXPECT_TRUE(item.value.empty());
260 g_deviceB->GetData(key, item);
261 EXPECT_TRUE(item.value == value);
262 }
263
264 /**
265 * @tc.name: sec option check Sync 003
266 * @tc.desc: if sec option equal, check not pass, forbid sync
267 * @tc.type: FUNC
268 * @tc.require: AR000EV1G6
269 * @tc.author: zhangqiquan
270 */
271 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck003, TestSize.Level1)
272 {
273 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
274 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon5f4525660202(const std::string &, const SecurityOption &) 275 adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
276 return false;
277 });
278 /**
279 * @tc.steps: step1. record packet
280 * @tc.expected: step1. sync should failed in source.
281 */
282 std::atomic<int> messageCount = 0;
__anon5f4525660302(const std::string &, Message *) 283 g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &, Message *) {
284 messageCount++;
285 });
286 /**
287 * @tc.steps: step2. deviceA call sync and wait
288 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
289 */
290 DBStatus status = OK;
291 std::vector<std::string> devices;
292 devices.push_back(g_deviceB->GetDeviceId());
293 std::map<std::string, DBStatus> result;
294 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
295 EXPECT_EQ(status, OK);
296 EXPECT_EQ(messageCount, 6); // 6 = 2 time sync + 4 ability sync
297 for (const auto &pair : result) {
298 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
299 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
300 }
301 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
302 g_communicatorAggregator->RegOnDispatch(nullptr);
303 }
304
305 /**
306 * @tc.name: sec option check Sync 004
307 * @tc.desc: memory db not check device security
308 * @tc.type: FUNC
309 * @tc.require:
310 * @tc.author: zhangqiquan
311 */
312 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck004, TestSize.Level1)
313 {
314 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
315 g_kvDelegatePtr = nullptr;
316 KvStoreNbDelegate::Option option;
317 option.secOption.securityLabel = SecurityLabel::NOT_SET;
318 option.isMemoryDb = true;
319 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
320 ASSERT_TRUE(g_kvDelegateStatus == OK);
321 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
322
323 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
324 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon5f4525660402(const std::string &, const SecurityOption &) 325 adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
326 return false;
327 });
__anon5f4525660502(const std::string &, SecurityOption &securityOption) 328 adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
329 securityOption.securityLabel = NOT_SET;
330 return OK;
331 });
__anon5f4525660602(SecurityOption &) 332 g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &) {
333 return -E_NOT_SUPPORT;
334 });
335
336 std::vector<std::string> devices;
337 devices.push_back(g_deviceB->GetDeviceId());
338 std::map<std::string, DBStatus> result;
339 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
340 EXPECT_EQ(status, OK);
341 for (const auto &pair : result) {
342 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
343 EXPECT_TRUE(pair.second == OK);
344 }
345
346 adapter->ForkCheckDeviceSecurityAbility(nullptr);
347 adapter->ForkGetSecurityOption(nullptr);
348 g_syncInterfaceB->ForkGetSecurityOption(nullptr);
349 }
350
351 #ifndef LOW_LEVEL_MEM_DEV
352 /**
353 * @tc.name: BigDataSync001
354 * @tc.desc: big data sync push mode.
355 * @tc.type: FUNC
356 * @tc.require: AR000F3OOU
357 * @tc.author: wangchuanqing
358 */
359 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync001, TestSize.Level1)
360 {
361 DBStatus status = OK;
362 std::vector<std::string> devices;
363 devices.push_back(g_deviceB->GetDeviceId());
364 devices.push_back(g_deviceC->GetDeviceId());
365
366 /**
367 * @tc.steps: step1. deviceA put 16 bigData
368 */
369 std::vector<Entry> entries;
370 std::vector<Key> keys;
371 DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
372 for (const auto &entry : entries) {
373 status = g_kvDelegatePtr->Put(entry.key, entry.value);
374 ASSERT_TRUE(status == OK);
375 }
376
377 /**
378 * @tc.steps: step2. deviceA call sync and wait
379 * @tc.expected: step2. sync should return OK.
380 */
381 std::map<std::string, DBStatus> result;
382 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
383 ASSERT_TRUE(status == OK);
384
385 /**
386 * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
387 */
388 ASSERT_TRUE(result.size() == devices.size());
389 for (const auto &pair : result) {
390 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
391 EXPECT_TRUE(pair.second == OK);
392 }
393 VirtualDataItem item;
394 for (const auto &entry : entries) {
395 item.value.clear();
396 g_deviceB->GetData(entry.key, item);
397 EXPECT_TRUE(item.value == entry.value);
398 item.value.clear();
399 g_deviceC->GetData(entry.key, item);
400 EXPECT_TRUE(item.value == entry.value);
401 }
402 }
403
404 /**
405 * @tc.name: BigDataSync002
406 * @tc.desc: big data sync pull mode.
407 * @tc.type: FUNC
408 * @tc.require: AR000F3OOU
409 * @tc.author: wangchuanqing
410 */
411 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync002, TestSize.Level1)
412 {
413 DBStatus status = OK;
414 std::vector<std::string> devices;
415 devices.push_back(g_deviceB->GetDeviceId());
416 devices.push_back(g_deviceC->GetDeviceId());
417
418 /**
419 * @tc.steps: step1. deviceA deviceB put bigData
420 */
421 std::vector<Entry> entries;
422 std::vector<Key> keys;
423 DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
424
425 for (uint32_t i = 0; i < entries.size(); i++) {
426 if (i % 2 == 0) {
427 g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
428 } else {
429 g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
430 }
431 }
432
433 /**
434 * @tc.steps: step3. deviceA call pull sync
435 * @tc.expected: step3. sync should return OK.
436 */
437 std::map<std::string, DBStatus> result;
438 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
439 ASSERT_TRUE(status == OK);
440
441 /**
442 * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
443 */
444 ASSERT_TRUE(result.size() == devices.size());
445 for (const auto &pair : result) {
446 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
447 EXPECT_TRUE(pair.second == OK);
448 }
449 for (const auto &entry : entries) {
450 Value value;
451 EXPECT_EQ(g_kvDelegatePtr->Get(entry.key, value), OK);
452 EXPECT_EQ(value, entry.value);
453 }
454 }
455
456 /**
457 * @tc.name: BigDataSync003
458 * @tc.desc: big data sync pushAndPull mode.
459 * @tc.type: FUNC
460 * @tc.require: AR000F3OOV
461 * @tc.author: wangchuanqing
462 */
463 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync003, TestSize.Level1)
464 {
465 DBStatus status = OK;
466 std::vector<std::string> devices;
467 devices.push_back(g_deviceB->GetDeviceId());
468 devices.push_back(g_deviceC->GetDeviceId());
469
470 /**
471 * @tc.steps: step1. deviceA deviceB put bigData
472 */
473 std::vector<Entry> entries;
474 std::vector<Key> keys;
475 DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
476
477 for (uint32_t i = 0; i < entries.size(); i++) {
478 if (i % 3 == 0) { // 0 3 6 9 12 15 for deivec B
479 g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
480 } else if (i % 3 == 1) { // 1 4 7 10 13 16 for device C
481 g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
482 } else { // 2 5 8 11 14 for device A
483 status = g_kvDelegatePtr->Put(entries[i].key, entries[i].value);
484 ASSERT_TRUE(status == OK);
485 }
486 }
487
488 /**
489 * @tc.steps: step3. deviceA call pushAndpull sync
490 * @tc.expected: step3. sync should return OK.
491 */
492 std::map<std::string, DBStatus> result;
493 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
494 ASSERT_TRUE(status == OK);
495
496 /**
497 * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
498 * deviceB and deviceC has deviceA data
499 */
500 ASSERT_TRUE(result.size() == devices.size());
501 for (const auto &pair : result) {
502 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
503 EXPECT_TRUE(pair.second == OK);
504 }
505
506 VirtualDataItem item;
507 for (uint32_t i = 0; i < entries.size(); i++) {
508 Value value;
509 EXPECT_EQ(g_kvDelegatePtr->Get(entries[i].key, value), OK);
510 EXPECT_EQ(value, entries[i].value);
511
512 if (i % 3 == 2) { // 2 5 8 11 14 for device A
513 item.value.clear();
514 g_deviceB->GetData(entries[i].key, item);
515 EXPECT_TRUE(item.value == entries[i].value);
516 item.value.clear();
517 g_deviceC->GetData(entries[i].key, item);
518 EXPECT_TRUE(item.value == entries[i].value);
519 }
520 }
521 }
522 #endif
523
524 /**
525 * @tc.name: PushFinishedNotify 001
526 * @tc.desc: Test remote device push finished notify function.
527 * @tc.type: FUNC
528 * @tc.require: AR000CQS3S
529 * @tc.author: xushaohua
530 */
531 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, PushFinishedNotify001, TestSize.Level1)
532 {
533 std::vector<std::string> devices;
534 devices.push_back(g_deviceB->GetDeviceId());
535
536 /**
537 * @tc.steps: step1. deviceA call SetRemotePushFinishedNotify
538 * @tc.expected: step1. set should return OK.
539 */
540 int pushfinishedFlag = 0;
541 DBStatus status = g_kvDelegatePtr->SetRemotePushFinishedNotify(
__anon5f4525660702(const RemotePushNotifyInfo &info) 542 [&pushfinishedFlag](const RemotePushNotifyInfo &info) {
543 EXPECT_TRUE(info.deviceId == DEVICE_B);
544 pushfinishedFlag = 1;
545 });
546 ASSERT_EQ(status, OK);
547
548 /**
549 * @tc.steps: step2. deviceB put k2, v2, and deviceA pull from deviceB
550 * @tc.expected: step2. deviceA can not receive push finished notify
551 */
552 EXPECT_EQ(g_kvDelegatePtr->Put(KEY_2, VALUE_2), OK);
553 std::map<std::string, DBStatus> result;
554 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
555 EXPECT_TRUE(status == OK);
556 EXPECT_EQ(pushfinishedFlag, 0);
557 pushfinishedFlag = 0;
558
559 /**
560 * @tc.steps: step3. deviceB put k3, v3, and deviceA push and pull to deviceB
561 * @tc.expected: step3. deviceA can not receive push finished notify
562 */
563 EXPECT_EQ(g_kvDelegatePtr->Put(KEY_3, VALUE_3), OK);
564 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
565 EXPECT_TRUE(status == OK);
566 EXPECT_EQ(pushfinishedFlag, 0);
567 pushfinishedFlag = 0;
568
569 /**
570 * @tc.steps: step4. deviceA call SetRemotePushFinishedNotify to reset notify
571 * @tc.expected: step4. set should return OK.
572 */
__anon5f4525660802(const RemotePushNotifyInfo &info) 573 status = g_kvDelegatePtr->SetRemotePushFinishedNotify([&pushfinishedFlag](const RemotePushNotifyInfo &info) {
574 EXPECT_TRUE(info.deviceId == DEVICE_B);
575 pushfinishedFlag = 2;
576 });
577 ASSERT_EQ(status, OK);
578
579 /**
580 * @tc.steps: step5. deviceA call SetRemotePushFinishedNotify set null to unregist
581 * @tc.expected: step5. set should return OK.
582 */
583 status = g_kvDelegatePtr->SetRemotePushFinishedNotify(nullptr);
584 ASSERT_EQ(status, OK);
585 }
586
587 namespace {
RegOnDispatchWithDelayAck(bool & errCodeAck,bool & afterErrAck)588 void RegOnDispatchWithDelayAck(bool &errCodeAck, bool &afterErrAck)
589 {
590 // just delay the busy ack
591 g_communicatorAggregator->RegOnDispatch([&errCodeAck, &afterErrAck](const std::string &dev, Message *inMsg) {
592 if (dev != g_deviceB->GetDeviceId()) {
593 return;
594 }
595 auto *packet = inMsg->GetObject<DataAckPacket>();
596 if (packet->GetRecvCode() == -E_BUSY) {
597 errCodeAck = true;
598 while (!afterErrAck) {
599 }
600 LOGW("NOW SEND BUSY ACK");
601 } else if (errCodeAck) {
602 afterErrAck = true;
603 std::this_thread::sleep_for(std::chrono::seconds(1));
604 }
605 });
606 }
607
RegOnDispatchWithOffline(bool & offlineFlag,bool & invalid,condition_variable & conditionOffline)608 void RegOnDispatchWithOffline(bool &offlineFlag, bool &invalid, condition_variable &conditionOffline)
609 {
610 g_communicatorAggregator->RegOnDispatch([&offlineFlag, &invalid, &conditionOffline](
611 const std::string &dev, Message *inMsg) {
612 auto *packet = inMsg->GetObject<DataAckPacket>();
613 if (dev != DEVICE_B) {
614 if (packet->GetRecvCode() == LOCAL_WATER_MARK_NOT_INIT) {
615 offlineFlag = true;
616 conditionOffline.notify_all();
617 LOGW("[Dispatch] NOTIFY OFFLINE");
618 std::this_thread::sleep_for(std::chrono::microseconds(EIGHT_HUNDRED));
619 }
620 } else if (!invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
621 LOGW("[Dispatch] NOW INVALID THIS MSG");
622 inMsg->SetMessageType(TYPE_INVALID);
623 inMsg->SetMessageId(INVALID_MESSAGE_ID);
624 invalid = true;
625 }
626 });
627 }
628
RegOnDispatchWithInvalidMsg(bool & invalid)629 void RegOnDispatchWithInvalidMsg(bool &invalid)
630 {
631 g_communicatorAggregator->RegOnDispatch([&invalid](
632 const std::string &dev, Message *inMsg) {
633 if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
634 LOGW("[Dispatch] NOW INVALID THIS MSG");
635 inMsg->SetMessageType(TYPE_INVALID);
636 inMsg->SetMessageId(INVALID_MESSAGE_ID);
637 invalid = true;
638 }
639 });
640 }
641
PrepareEnv(vector<std::string> & devices,Key & key,Query & query)642 void PrepareEnv(vector<std::string> &devices, Key &key, Query &query)
643 {
644 /**
645 * @tc.steps: step1. ensure the watermark is no zero and finish timeSync and abilitySync
646 * @tc.expected: step1. should return OK.
647 */
648 Value value = {'1'};
649 std::map<std::string, DBStatus> result;
650 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
651
652 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
653 EXPECT_TRUE(status == OK);
654 ASSERT_TRUE(result[g_deviceB->GetDeviceId()] == OK);
655 }
656
Sync(vector<std::string> & devices,const DBStatus & targetStatus)657 void Sync(vector<std::string> &devices, const DBStatus &targetStatus)
658 {
659 std::map<std::string, DBStatus> result;
660 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result);
661 EXPECT_TRUE(status == OK);
662 for (const auto &deviceId : devices) {
663 ASSERT_TRUE(result[deviceId] == targetStatus);
664 }
665 }
666
SyncWithQuery(vector<std::string> & devices,const Query & query,const SyncMode & mode,const DBStatus & targetStatus)667 void SyncWithQuery(vector<std::string> &devices, const Query &query, const SyncMode &mode,
668 const DBStatus &targetStatus)
669 {
670 std::map<std::string, DBStatus> result;
671 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result, query);
672 EXPECT_TRUE(status == OK);
673 for (const auto &deviceId : devices) {
674 ASSERT_TRUE(result[deviceId] == targetStatus);
675 }
676 }
677
SyncWithQuery(vector<std::string> & devices,const Query & query,const DBStatus & targetStatus)678 void SyncWithQuery(vector<std::string> &devices, const Query &query, const DBStatus &targetStatus)
679 {
680 SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PUSH_ONLY, targetStatus);
681 }
682
SyncWithDeviceOffline(vector<std::string> & devices,Key & key,const Query & query)683 void SyncWithDeviceOffline(vector<std::string> &devices, Key &key, const Query &query)
684 {
685 Value value = {'2'};
686 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
687
688 /**
689 * @tc.steps: step2. invalid the sync msg
690 * @tc.expected: step2. should return TIME_OUT.
691 */
692 SyncWithQuery(devices, query, TIME_OUT);
693
694 /**
695 * @tc.steps: step3. device offline when sync
696 * @tc.expected: step3. should return COMM_FAILURE.
697 */
698 SyncWithQuery(devices, query, COMM_FAILURE);
699 }
700
PrepareWaterMarkError(std::vector<std::string> & devices,Query & query)701 void PrepareWaterMarkError(std::vector<std::string> &devices, Query &query)
702 {
703 /**
704 * @tc.steps: step1. prepare data
705 */
706 devices.push_back(g_deviceB->GetDeviceId());
707 g_deviceB->Online();
708
709 Key key = {'1'};
710 query = Query::Select().PrefixKey(key);
711 PrepareEnv(devices, key, query);
712
713 /**
714 * @tc.steps: step2. query sync and set queryWaterMark
715 * @tc.expected: step2. should return OK.
716 */
717 Value value = {'2'};
718 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
719 SyncWithQuery(devices, query, OK);
720
721 /**
722 * @tc.steps: step3. sync and invalid msg for set local device waterMark
723 * @tc.expected: step3. should return TIME_OUT.
724 */
725 bool invalidMsg = false;
726 RegOnDispatchWithInvalidMsg(invalidMsg);
727 value = {'3'};
728 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
729 Sync(devices, TIME_OUT);
730 g_communicatorAggregator->RegOnDispatch(nullptr);
731 }
732 }
733
734 /**
735 * @tc.name: AckSessionCheck 001
736 * @tc.desc: Test ack session check function.
737 * @tc.type: FUNC
738 * @tc.require: AR000F3OOV
739 * @tc.author: zhangqiquan
740 */
741 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSessionCheck001, TestSize.Level3)
742 {
743 std::vector<std::string> devices;
744 devices.push_back(g_deviceB->GetDeviceId());
745
746 /**
747 * @tc.steps: step1. deviceB sync to deviceA just for timeSync and abilitySync
748 * @tc.expected: step1. should return OK.
749 */
750 ASSERT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
751
752 /**
753 * @tc.steps: step2. deviceA StartTransaction for prevent other sync action deviceB sync will fail
754 * @tc.expected: step2. should return OK.
755 */
756 ASSERT_TRUE(g_kvDelegatePtr->StartTransaction() == OK);
757
758 bool errCodeAck = false;
759 bool afterErrAck = false;
760 RegOnDispatchWithDelayAck(errCodeAck, afterErrAck);
761
762 Key key = {'1'};
763 Value value = {'1'};
764 Timestamp currentTime;
765 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
766 EXPECT_TRUE(g_deviceB->PutData(key, value, currentTime, 0) == OK);
767 EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
768
769 Value outValue;
770 EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == NOT_FOUND);
771
772 /**
773 * @tc.steps: step3. release the writeHandle and try again, sync success
774 * @tc.expected: step3. should return OK.
775 */
776 EXPECT_TRUE(g_kvDelegatePtr->Commit() == OK);
777 EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
778
779 EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == E_OK);
780 EXPECT_EQ(outValue, value);
781 }
782
783 /**
784 * @tc.name: AckSafeCheck001
785 * @tc.desc: Test ack session check filter all bad ack in device offline scene.
786 * @tc.type: FUNC
787 * @tc.require: AR000F3OOV
788 * @tc.author: zhangqiquan
789 */
790 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSafeCheck001, TestSize.Level3)
791 {
792 std::vector<std::string> devices;
793 devices.push_back(g_deviceB->GetDeviceId());
794 g_deviceB->Online();
795
796 Key key = {'1'};
797 Query query = Query::Select().PrefixKey(key);
798 PrepareEnv(devices, key, query);
799
800 std::condition_variable conditionOnline;
801 std::condition_variable conditionOffline;
802 bool onlineFlag = false;
803 bool invalid = false;
804 bool offlineFlag = false;
__anon5f4525660d02() 805 thread subThread([&onlineFlag, &conditionOnline, &offlineFlag, &conditionOffline]() {
806 LOGW("[Dispatch] NOW DEVICES IS OFFLINE");
807 std::mutex offlineMtx;
808 std::unique_lock<std::mutex> lck(offlineMtx);
809 conditionOffline.wait(lck, [&offlineFlag]{ return offlineFlag; });
810 g_deviceB->Offline();
811 std::this_thread::sleep_for(std::chrono::milliseconds(100));
812 g_deviceB->Online();
813 onlineFlag = true;
814 conditionOnline.notify_all();
815 LOGW("[Dispatch] NOW DEVICES IS ONLINE");
816 });
817 subThread.detach();
818
819 RegOnDispatchWithOffline(offlineFlag, invalid, conditionOffline);
820
821 SyncWithDeviceOffline(devices, key, query);
822
823 std::mutex onlineMtx;
824 std::unique_lock<std::mutex> lck(onlineMtx);
__anon5f4525660f02null825 conditionOnline.wait(lck, [&onlineFlag]{ return onlineFlag; });
826
827 /**
828 * @tc.steps: step4. sync again if has problem it will sync never end
829 * @tc.expected: step4. should return OK.
830 */
831 SyncWithQuery(devices, query, OK);
832 }
833
834 /**
835 * @tc.name: WaterMarkCheck001
836 * @tc.desc: Test waterMark work correct in lost package scene.
837 * @tc.type: FUNC
838 * @tc.require: AR000F3OOV
839 * @tc.author: zhangqiquan
840 */
841 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck001, TestSize.Level1)
842 {
843 std::vector<std::string> devices;
844 Query query = Query::Select();
845 PrepareWaterMarkError(devices, query);
846
847 /**
848 * @tc.steps: step4. sync again see it work correct
849 * @tc.expected: step4. should return OK.
850 */
851 SyncWithQuery(devices, query, OK);
852 }
853
854 /**
855 * @tc.name: WaterMarkCheck002
856 * @tc.desc: Test pull work correct in error waterMark scene.
857 * @tc.type: FUNC
858 * @tc.require: AR000F3OOV
859 * @tc.author: zhangqiquan
860 */
861 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck002, TestSize.Level1)
862 {
863 std::vector<std::string> devices;
864 Query query = Query::Select();
865 PrepareWaterMarkError(devices, query);
866
867 /**
868 * @tc.steps: step4. sync again see it work correct
869 * @tc.expected: step4. should return OK.
870 */
871 Key key = {'2'};
872 ASSERT_TRUE(g_kvDelegatePtr->Put(key, {}) == OK);
873 query = Query::Select();
874 SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PULL_ONLY, OK);
875
876 VirtualDataItem item;
877 EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
878 }
879
RegOnDispatchToGetSyncCount(int & sendRequestCount,int sleepMs=0)880 void RegOnDispatchToGetSyncCount(int &sendRequestCount, int sleepMs = 0)
881 {
882 g_communicatorAggregator->RegOnDispatch([sleepMs, &sendRequestCount](
883 const std::string &dev, Message *inMsg) {
884 if (dev == DEVICE_B && inMsg->GetMessageType() == TYPE_REQUEST) {
885 std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
886 sendRequestCount++;
887 LOGD("sendRequestCount++...");
888 }
889 });
890 }
891
TestDifferentSyncMode(SyncMode mode)892 void TestDifferentSyncMode(SyncMode mode)
893 {
894 std::vector<std::string> devices;
895 devices.push_back(g_deviceB->GetDeviceId());
896
897 /**
898 * @tc.steps: step1. deviceA put {k1, v1}
899 */
900 Key key = {'1'};
901 Value value = {'1'};
902 DBStatus status = g_kvDelegatePtr->Put(key, value);
903 ASSERT_TRUE(status == OK);
904
905 int sendRequestCount = 0;
906 RegOnDispatchToGetSyncCount(sendRequestCount);
907
908 /**
909 * @tc.steps: step2. deviceA call sync and wait
910 * @tc.expected: step2. sync should return OK.
911 */
912 std::map<std::string, DBStatus> result;
913 status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result);
914 ASSERT_TRUE(status == OK);
915
916 /**
917 * @tc.expected: step2. onComplete should be called, DeviceB have {k1,v1}, send request message 3 times
918 */
919 ASSERT_TRUE(result.size() == devices.size());
920 for (const auto &pair : result) {
921 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
922 EXPECT_TRUE(pair.second == OK);
923 }
924 VirtualDataItem item;
925 g_deviceB->GetData(key, item);
926 EXPECT_TRUE(item.value == value);
927
928 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
929
930 /**
931 * @tc.steps: step3. reset sendRequestCount to 0, deviceA call sync and wait again without any change in db
932 * @tc.expected: step3. sync should return OK, and sendRequestCount should be 1, because this merge can not
933 * be skipped
934 */
935 sendRequestCount = 0;
936 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
937 ASSERT_TRUE(status == OK);
938 EXPECT_EQ(sendRequestCount, 1);
939 }
940
941 /**
942 * @tc.name: PushSyncMergeCheck001
943 * @tc.desc: Test push sync task merge, task can not be merged when the two sync task is not in the queue
944 * at the same time.
945 * @tc.type: FUNC
946 * @tc.require: AR000F3OOV
947 * @tc.author: zhangshijie
948 */
949 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck001, TestSize.Level1)
950 {
951 TestDifferentSyncMode(SYNC_MODE_PUSH_ONLY);
952 }
953
954 /**
955 * @tc.name: PushSyncMergeCheck002
956 * @tc.desc: Test push_pull sync task merge, task can not be merged when the two sync task is not in the queue
957 * at the same time.
958 * @tc.type: FUNC
959 * @tc.require: AR000F3OOV
960 * @tc.author: zhangshijie
961 */
962 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck002, TestSize.Level1)
963 {
964 TestDifferentSyncMode(SYNC_MODE_PUSH_PULL);
965 }
966
PrepareForSyncMergeTest(std::vector<std::string> & devices,int & sendRequestCount)967 void PrepareForSyncMergeTest(std::vector<std::string> &devices, int &sendRequestCount)
968 {
969 /**
970 * @tc.steps: step1. deviceA put {k1, v1}
971 */
972 Key key = {'1'};
973 Value value = {'1'};
974 DBStatus status = g_kvDelegatePtr->Put(key, value);
975 ASSERT_TRUE(status == OK);
976
977 RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
978
979 /**
980 * @tc.steps: step2. deviceA call sync and don't wait
981 * @tc.expected: step2. sync should return OK.
982 */
983 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
984 [&sendRequestCount, devices, key, value](const std::map<std::string, DBStatus>& statusMap) {
985 ASSERT_TRUE(statusMap.size() == devices.size());
986 for (const auto &pair : statusMap) {
987 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
988 EXPECT_TRUE(pair.second == OK);
989 }
990 VirtualDataItem item;
991 g_deviceB->GetData(key, item);
992 EXPECT_EQ(item.value, value);
993 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
994
995 // reset sendRequestCount to 0
996 sendRequestCount = 0;
997 });
998 ASSERT_TRUE(status == OK);
999 }
1000
1001 /**
1002 * @tc.name: PushSyncMergeCheck003
1003 * @tc.desc: Test push sync task merge, task can not be merged when there is change in db since last push sync
1004 * @tc.type: FUNC
1005 * @tc.require: AR000F3OOV
1006 * @tc.author: zhangshijie
1007 */
1008 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck003, TestSize.Level3)
1009 {
1010 DBStatus status = OK;
1011 std::vector<std::string> devices;
1012 devices.push_back(g_deviceB->GetDeviceId());
1013
1014 int sendRequestCount = 0;
1015 PrepareForSyncMergeTest(devices, sendRequestCount);
1016
1017 /**
1018 * @tc.steps: step3. deviceA call sync and don't wait
1019 * @tc.expected: step3. sync should return OK.
1020 */
1021 Key key = {'1'};
1022 Value value = {'2'};
1023 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661202(const std::map<std::string, DBStatus>& statusMap) 1024 [&sendRequestCount, devices, key, value, this](const std::map<std::string, DBStatus>& statusMap) {
1025 /**
1026 * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1027 * skipped, but it is no need to do time sync and ability sync, only need to do data sync
1028 */
1029 ASSERT_TRUE(statusMap.size() == devices.size());
1030 for (const auto &pair : statusMap) {
1031 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1032 EXPECT_TRUE(pair.second == OK);
1033 }
1034 VirtualDataItem item;
1035 g_deviceB->GetData(key, item);
1036 EXPECT_EQ(item.value, value);
1037 });
1038 ASSERT_TRUE(status == OK);
1039
1040 /**
1041 * @tc.steps: step4. deviceA put {k1, v2}
1042 */
1043 while (sendRequestCount < TWO_CNT) {
1044 std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1045 }
1046 status = g_kvDelegatePtr->Put(key, value);
1047 ASSERT_TRUE(status == OK);
1048 // wait for the second sync task finish
1049 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1050 EXPECT_EQ(sendRequestCount, 1);
1051 }
1052
1053 /**
1054 * @tc.name: PushSyncMergeCheck004
1055 * @tc.desc: Test push sync task merge, task can be merged when there is no change in db since last push sync
1056 * @tc.type: FUNC
1057 * @tc.require: AR000F3OOV
1058 * @tc.author: zhangshijie
1059 */
1060 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck004, TestSize.Level3)
1061 {
1062 DBStatus status = OK;
1063 std::vector<std::string> devices;
1064 devices.push_back(g_deviceB->GetDeviceId());
1065
1066 int sendRequestCount = 0;
1067 PrepareForSyncMergeTest(devices, sendRequestCount);
1068
1069 /**
1070 * @tc.steps: step3. deviceA call sync and don't wait
1071 * @tc.expected: step3. sync should return OK.
1072 */
1073 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661302(const std::map<std::string, DBStatus>& statusMap) 1074 [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1075 /**
1076 * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can be
1077 * skipped
1078 */
1079 ASSERT_TRUE(statusMap.size() == devices.size());
1080 for (const auto &pair : statusMap) {
1081 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1082 EXPECT_TRUE(pair.second == OK);
1083 }
1084 });
1085 ASSERT_TRUE(status == OK);
1086 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1087 EXPECT_EQ(sendRequestCount, 0);
1088 }
1089
RegOnDispatchWithInvalidMsgAndCnt(int & sendRequestCount,int sleepMs,bool & invalid)1090 void RegOnDispatchWithInvalidMsgAndCnt(int &sendRequestCount, int sleepMs, bool &invalid)
1091 {
1092 g_communicatorAggregator->RegOnDispatch([&sendRequestCount, sleepMs, &invalid](
1093 const std::string &dev, Message *inMsg) {
1094 if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
1095 inMsg->SetMessageType(TYPE_INVALID);
1096 inMsg->SetMessageId(INVALID_MESSAGE_ID);
1097 sendRequestCount++;
1098 invalid = true;
1099 LOGW("[Dispatch]invalid THIS MSG, sendRequestCount = %d", sendRequestCount);
1100 std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1101 }
1102 });
1103 }
1104
1105 /**
1106 * @tc.name: PushSyncMergeCheck005
1107 * @tc.desc: Test push sync task merge, task cannot be merged when the last push sync is failed
1108 * @tc.type: FUNC
1109 * @tc.require: AR000F3OOV
1110 * @tc.author: zhangshijie
1111 */
1112 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck005, TestSize.Level3)
1113 {
1114 DBStatus status = OK;
1115 std::vector<std::string> devices;
1116 devices.push_back(g_deviceB->GetDeviceId());
1117
1118 /**
1119 * @tc.steps: step1. deviceA put {k1, v1}
1120 */
1121 Key key = {'1'};
1122 Value value = {'1'};
1123 status = g_kvDelegatePtr->Put(key, value);
1124 ASSERT_TRUE(status == OK);
1125
1126 int sendRequestCount = 0;
1127 bool invalid = false;
1128 RegOnDispatchWithInvalidMsgAndCnt(sendRequestCount, SLEEP_MILLISECONDS, invalid);
1129
1130 /**
1131 * @tc.steps: step2. deviceA call sync and don't wait
1132 * @tc.expected: step2. sync should return TIME_OUT.
1133 */
1134 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661502(const std::map<std::string, DBStatus>& statusMap) 1135 [&sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1136 ASSERT_TRUE(statusMap.size() == devices.size());
1137 for (const auto &deviceId : devices) {
1138 ASSERT_EQ(statusMap.at(deviceId), TIME_OUT);
1139 }
1140 });
1141 EXPECT_TRUE(status == OK);
1142
1143 /**
1144 * @tc.steps: step3. deviceA call sync and don't wait
1145 * @tc.expected: step3. sync should return OK.
1146 */
1147 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661602(const std::map<std::string, DBStatus>& statusMap) 1148 [key, value, &sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1149 /**
1150 * @tc.expected: when the second sync task return, sendRequestCount should be 3, because this merge can not be
1151 * skipped, deviceB should have {k1, v1}.
1152 */
1153 ASSERT_TRUE(statusMap.size() == devices.size());
1154 for (const auto &pair : statusMap) {
1155 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1156 EXPECT_EQ(pair.second, OK);
1157 }
1158 VirtualDataItem item;
1159 g_deviceB->GetData(key, item);
1160 EXPECT_EQ(item.value, value);
1161 });
1162 ASSERT_TRUE(status == OK);
1163 while (sendRequestCount < 1) {
1164 std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1165 }
1166 sendRequestCount = 0;
1167 RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1168
1169 // wait for the second sync task finish
1170 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1171 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1172 }
1173
PrePareForQuerySyncMergeTest(bool isQuerySync,std::vector<std::string> & devices,Key & key,Value & value,int & sendRequestCount)1174 void PrePareForQuerySyncMergeTest(bool isQuerySync, std::vector<std::string> &devices,
1175 Key &key, Value &value, int &sendRequestCount)
1176 {
1177 DBStatus status = OK;
1178 /**
1179 * @tc.steps: step1. deviceA put {k1, v1}...{k10, v10}
1180 */
1181 Query query = Query::Select().PrefixKey(key);
1182 const int dataSize = 10;
1183 for (int i = 0; i < dataSize; i++) {
1184 key.push_back(i);
1185 value.push_back(i);
1186 status = g_kvDelegatePtr->Put(key, value);
1187 ASSERT_TRUE(status == OK);
1188 key.pop_back();
1189 value.pop_back();
1190 }
1191
1192 RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1193 /**
1194 * @tc.steps: step2. deviceA call query sync and don't wait
1195 * @tc.expected: step2. sync should return OK.
1196 */
1197 auto completeCallBack = [&sendRequestCount, &key, &value, dataSize, devices]
1198 (const std::map<std::string, DBStatus>& statusMap) {
1199 ASSERT_TRUE(statusMap.size() == devices.size());
1200 for (const auto &pair : statusMap) {
1201 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1202 EXPECT_EQ(pair.second, OK);
1203 }
1204 // when first sync finish, DeviceB have {k1,v1}, {k3,v3}, {k5,v5} .. send request message 3 times
1205 VirtualDataItem item;
1206 for (int i = 0; i < dataSize; i++) {
1207 key.push_back(i);
1208 value.push_back(i);
1209 g_deviceB->GetData(key, item);
1210 EXPECT_EQ(item.value, value);
1211 key.pop_back();
1212 value.pop_back();
1213 }
1214 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1215 // reset sendRequestCount to 0
1216 sendRequestCount = 0;
1217 };
1218 if (isQuerySync) {
1219 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack, query, false);
1220 } else {
1221 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack);
1222 }
1223 ASSERT_TRUE(status == OK);
1224 }
1225
1226 /**
1227 * @tc.name: QuerySyncMergeCheck001
1228 * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last query sync
1229 * @tc.type: FUNC
1230 * @tc.require: AR000F3OOV
1231 * @tc.author: zhangshijie
1232 */
1233 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck001, TestSize.Level3)
1234 {
1235 std::vector<std::string> devices;
1236 int sendRequestCount = 0;
1237 devices.push_back(g_deviceB->GetDeviceId());
1238
1239 Key key {'1'};
1240 Value value {'1'};
1241 Query query = Query::Select().PrefixKey(key);
1242 PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1243
1244 /**
1245 * @tc.steps: step3. deviceA call query sync and don't wait
1246 * @tc.expected: step3. sync should return OK.
1247 */
1248 DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661802(const std::map<std::string, DBStatus>& statusMap) 1249 [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1250 /**
1251 * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can be
1252 * skipped because there is no change in db since last query sync
1253 */
1254 ASSERT_TRUE(statusMap.size() == devices.size());
1255 for (const auto &pair : statusMap) {
1256 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1257 EXPECT_TRUE(pair.second == OK);
1258 }
1259 }, query, false);
1260 ASSERT_TRUE(status == OK);
1261 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1262 EXPECT_EQ(sendRequestCount, 0);
1263 }
1264
1265 /**
1266 * @tc.name: QuerySyncMergeCheck002
1267 * @tc.desc: Test query push sync task merge, task can not be merged when there is change in db since last sync
1268 * @tc.type: FUNC
1269 * @tc.require: AR000F3OOV
1270 * @tc.author: zhangshijie
1271 */
1272 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck002, TestSize.Level3)
1273 {
1274 std::vector<std::string> devices;
1275 int sendRequestCount = 0;
1276 devices.push_back(g_deviceB->GetDeviceId());
1277
1278 Key key {'1'};
1279 Value value {'1'};
1280 Query query = Query::Select().PrefixKey(key);
1281 PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1282
1283 /**
1284 * @tc.steps: step3. deviceA call query sync and don't wait
1285 * @tc.expected: step3. sync should return OK.
1286 */
1287 Value value3{'3'};
1288 DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661902(const std::map<std::string, DBStatus>& statusMap) 1289 [&sendRequestCount, devices, key, value3, this](const std::map<std::string, DBStatus>& statusMap) {
1290 /**
1291 * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1292 * skipped when there is change in db since last query sync, deviceB have {k1, v1'}
1293 */
1294 ASSERT_TRUE(statusMap.size() == devices.size());
1295 for (const auto &pair : statusMap) {
1296 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1297 EXPECT_TRUE(pair.second == OK);
1298 }
1299 VirtualDataItem item;
1300 g_deviceB->GetData(key, item);
1301 EXPECT_TRUE(item.value == value3);
1302 EXPECT_EQ(sendRequestCount, 1);
1303 }, query, false);
1304 ASSERT_TRUE(status == OK);
1305
1306 /**
1307 * @tc.steps: step4. deviceA put {k1, v1'}
1308 * @tc.steps: step4. reset sendRequestCount to 0, deviceA call sync and wait
1309 * @tc.expected: step4. sync should return OK, and sendRequestCount should be 1, because this merge can not
1310 * be skipped
1311 */
1312 while (sendRequestCount < TWO_CNT) {
1313 std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1314 }
1315 g_kvDelegatePtr->Put(key, value3);
1316 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1317 }
1318
1319 /**
1320 * @tc.name: QuerySyncMergeCheck003
1321 * @tc.desc: Test query push sync task merge, task can not be merged when then query id is different
1322 * @tc.type: FUNC
1323 * @tc.require: AR000F3OOV
1324 * @tc.author: zhangshijie
1325 */
1326 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck003, TestSize.Level3)
1327 {
1328 std::vector<std::string> devices;
1329 int sendRequestCount = 0;
1330 devices.push_back(g_deviceB->GetDeviceId());
1331
1332 Key key {'1'};
1333 Value value {'1'};
1334 PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1335
1336 /**
1337 * @tc.steps: step3. deviceA call another query sync
1338 * @tc.expected: step3. sync should return OK.
1339 */
1340 Key key2 = {'2'};
1341 Value value2 = {'2'};
1342 DBStatus status = g_kvDelegatePtr->Put(key2, value2);
1343 ASSERT_TRUE(status == OK);
1344 Query query2 = Query::Select().PrefixKey(key2);
1345 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661a02(const std::map<std::string, DBStatus>& statusMap) 1346 [&sendRequestCount, key2, value2, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1347 /**
1348 * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1349 * skipped, deviceB have {k2,v2}
1350 */
1351 ASSERT_TRUE(statusMap.size() == devices.size());
1352 for (const auto &pair : statusMap) {
1353 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1354 EXPECT_TRUE(pair.second == OK);
1355 }
1356 VirtualDataItem item;
1357 g_deviceB->GetData(key2, item);
1358 EXPECT_TRUE(item.value == value2);
1359 EXPECT_EQ(sendRequestCount, 1);
1360 }, query2, false);
1361 ASSERT_TRUE(status == OK);
1362 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1363 }
1364
1365 /**
1366 * @tc.name: QuerySyncMergeCheck004
1367 * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last push sync
1368 * @tc.type: FUNC
1369 * @tc.require: AR000F3OOV
1370 * @tc.author: zhangshijie
1371 */
1372 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck004, TestSize.Level3)
1373 {
1374 DBStatus status = OK;
1375 std::vector<std::string> devices;
1376 devices.push_back(g_deviceB->GetDeviceId());
1377
1378 Key key {'1'};
1379 Value value {'1'};
1380 int sendRequestCount = 0;
1381 PrePareForQuerySyncMergeTest(false, devices, key, value, sendRequestCount);
1382
1383 /**
1384 * @tc.steps: step3. deviceA call query sync without any change in db
1385 * @tc.expected: step3. sync should return OK, and sendRequestCount should be 0, because this merge can be skipped
1386 */
1387 Query query = Query::Select().PrefixKey(key);
1388 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon5f4525661b02(const std::map<std::string, DBStatus>& statusMap) 1389 [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1390 /**
1391 * @tc.expected step3: when the second sync task return, sendRequestCount should be 0, because this merge
1392 * can be skipped because there is no change in db since last push sync
1393 */
1394 ASSERT_TRUE(statusMap.size() == devices.size());
1395 for (const auto &pair : statusMap) {
1396 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1397 EXPECT_TRUE(pair.second == OK);
1398 }
1399 }, query, false);
1400 ASSERT_TRUE(status == OK);
1401 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1402 EXPECT_EQ(sendRequestCount, 0);
1403 }
1404
1405 /**
1406 * @tc.name: GetDataNotify001
1407 * @tc.desc: Test GetDataNotify function, delay < 30s should sync ok, > 36 should timeout
1408 * @tc.type: FUNC
1409 * @tc.require: AR000D4876
1410 * @tc.author: zhangqiquan
1411 */
1412 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify001, TestSize.Level3)
1413 {
1414 ASSERT_NE(g_kvDelegatePtr, nullptr);
1415 DBStatus status = OK;
1416 std::vector<std::string> devices;
1417 devices.push_back(g_deviceB->GetDeviceId());
1418 const std::string DEVICE_A = "real_device";
1419 /**
1420 * @tc.steps: step1. deviceB set get data delay 40s
1421 */
1422 g_deviceB->DelayGetSyncData(WAIT_40_SECONDS);
1423 g_communicatorAggregator->SetTimeout(DEVICE_A, TIMEOUT_6_SECONDS);
1424
1425 /**
1426 * @tc.steps: step2. deviceA call sync and wait
1427 * @tc.expected: step2. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
1428 */
1429 std::map<std::string, DBStatus> result;
1430 std::map<std::string, int> virtualRes;
1431 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1432 EXPECT_EQ(status, OK);
1433 EXPECT_EQ(result.size(), devices.size());
1434 EXPECT_EQ(result[DEVICE_B], TIME_OUT);
1435 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1436 Query query = Query::Select();
__anon5f4525661c02(std::map<std::string, int> resMap) 1437 g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1438 virtualRes = std::move(resMap);
1439 }, true);
1440 EXPECT_EQ(virtualRes.size(), devices.size());
1441 EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_TIMEOUT));
1442 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1443
1444 /**
1445 * @tc.steps: step3. deviceB set get data delay 30s
1446 */
1447 g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1448 /**
1449 * @tc.steps: step4. deviceA call sync and wait
1450 * @tc.expected: step4. sync should return OK. onComplete should be called, deviceB sync OK.
1451 */
1452 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1453 EXPECT_EQ(status, OK);
1454 EXPECT_EQ(result.size(), devices.size());
1455 EXPECT_EQ(result[DEVICE_B], OK);
1456 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
__anon5f4525661d02(std::map<std::string, int> resMap) 1457 g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1458 virtualRes = std::move(resMap);
1459 }, true);
1460 EXPECT_EQ(virtualRes.size(), devices.size());
1461 EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_FINISHED_ALL));
1462 g_deviceB->DelayGetSyncData(0);
1463 }
1464
1465 /**
1466 * @tc.name: GetDataNotify002
1467 * @tc.desc: Test GetDataNotify function, two device sync each other at same time
1468 * @tc.type: FUNC
1469 * @tc.require: AR000D4876
1470 * @tc.author: zhangqiquan
1471 */
1472 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify002, TestSize.Level3)
1473 {
1474 ASSERT_NE(g_kvDelegatePtr, nullptr);
1475 DBStatus status = OK;
1476 std::vector<std::string> devices;
1477 devices.push_back(g_deviceB->GetDeviceId());
1478 const std::string DEVICE_A = "real_device";
1479
1480 /**
1481 * @tc.steps: step1. deviceA sync first to finish time sync and ability sync
1482 */
1483 std::map<std::string, DBStatus> result;
1484 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1485 EXPECT_EQ(status, OK);
1486 EXPECT_EQ(result.size(), devices.size());
1487 EXPECT_EQ(result[DEVICE_B], OK);
1488 /**
1489 * @tc.steps: step2. deviceB set get data delay 30s
1490 */
1491 g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1492
1493 /**
1494 * @tc.steps: step3. deviceB call sync and wait
1495 */
__anon5f4525661e02() 1496 std::thread asyncThread([]() {
1497 std::map<std::string, int> virtualRes;
1498 Query query = Query::Select();
1499 g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1500 virtualRes = std::move(resMap);
1501 }, true);
1502 });
1503
1504 /**
1505 * @tc.steps: step4. deviceA call sync and wait
1506 * @tc.expected: step4. sync should return OK. because notify timer trigger (30s - 1s)/2s => 15times
1507 */
1508 std::this_thread::sleep_for(std::chrono::seconds(1));
1509 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1510 EXPECT_EQ(status, OK);
1511 EXPECT_EQ(result.size(), devices.size());
1512 EXPECT_EQ(result[DEVICE_B], OK);
1513 asyncThread.join();
1514 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1515 }