1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17
18 #include "ability_sync.h"
19 #include "distributeddb_data_generate_unit_test.h"
20 #include "distributeddb_tools_unit_test.h"
21 #include "kv_store_nb_delegate_impl.h"
22 #include "kv_virtual_device.h"
23 #include "platform_specific.h"
24 #include "process_system_api_adapter_impl.h"
25 #include "single_ver_data_packet.h"
26 #include "virtual_communicator_aggregator.h"
27
28 using namespace testing::ext;
29 using namespace DistributedDB;
30 using namespace DistributedDBUnitTest;
31 using namespace std;
32
33 namespace {
34 string g_testDir;
35 const string STORE_ID = "kv_stroe_sync_check_test";
36 const std::string DEVICE_B = "deviceB";
37 const std::string DEVICE_C = "deviceC";
38 const int LOCAL_WATER_MARK_NOT_INIT = 0xaa;
39 const int EIGHT_HUNDRED = 800;
40 const int NORMAL_SYNC_SEND_REQUEST_CNT = 3;
41 const int TWO_CNT = 2;
42 const int SLEEP_MILLISECONDS = 500;
43 const int TEN_SECONDS = 10;
44 const int THREE_HUNDRED = 300;
45 const int WAIT_30_SECONDS = 30000;
46 const int WAIT_40_SECONDS = 40000;
47 const int TIMEOUT_6_SECONDS = 6000;
48
49 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
50 KvStoreConfig g_config;
51 DistributedDBToolsUnitTest g_tool;
52 DBStatus g_kvDelegateStatus = INVALID_ARGS;
53 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
54 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
55 KvVirtualDevice* g_deviceB = nullptr;
56 KvVirtualDevice* g_deviceC = nullptr;
57 VirtualSingleVerSyncDBInterface *g_syncInterfaceB = nullptr;
58 VirtualSingleVerSyncDBInterface *g_syncInterfaceC = nullptr;
59
60 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
61 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
62 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
63 #ifndef LOW_LEVEL_MEM_DEV
64 const int KEY_LEN = 20; // 20 Bytes
65 const int VALUE_LEN = 4 * 1024 * 1024; // 4MB
66 const int ENTRY_NUM = 2; // 16 entries
67 #endif
68
69 class DistributedDBSingleVerP2PSyncCheckTest : public testing::Test {
70 public:
71 static void SetUpTestCase(void);
72 static void TearDownTestCase(void);
73 void SetUp();
74 void TearDown();
75 void CancelTestInit(DeviceSyncOption &option, std::vector<Entry> &entries, const uint32_t mtuSize);
76 void CancelTestEnd(std::vector<Entry> &entries, const uint32_t mtuSize);
77 };
78
SetUpTestCase(void)79 void DistributedDBSingleVerP2PSyncCheckTest::SetUpTestCase(void)
80 {
81 /**
82 * @tc.setup: Init datadir and Virtual Communicator.
83 */
84 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
85 g_config.dataDir = g_testDir;
86 g_mgr.SetKvStoreConfig(g_config);
87
88 string dir = g_testDir + "/single_ver";
89 DIR* dirTmp = opendir(dir.c_str());
90 if (dirTmp == nullptr) {
91 OS::MakeDBDirectory(dir);
92 } else {
93 closedir(dirTmp);
94 }
95
96 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
97 ASSERT_TRUE(g_communicatorAggregator != nullptr);
98 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
99
100 std::shared_ptr<ProcessSystemApiAdapterImpl> g_adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
101 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(g_adapter);
102 }
103
TearDownTestCase(void)104 void DistributedDBSingleVerP2PSyncCheckTest::TearDownTestCase(void)
105 {
106 /**
107 * @tc.teardown: Release virtual Communicator and clear data dir.
108 */
109 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
110 LOGE("rm test db files error!");
111 }
112 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
113 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
114 }
115
SetUp(void)116 void DistributedDBSingleVerP2PSyncCheckTest::SetUp(void)
117 {
118 DistributedDBToolsUnitTest::PrintTestCaseInfo();
119 /**
120 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
121 */
122 KvStoreNbDelegate::Option option;
123 option.secOption.securityLabel = SecurityLabel::S3;
124 option.secOption.securityFlag = SecurityFlag::SECE;
125 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
126 ASSERT_TRUE(g_kvDelegateStatus == OK);
127 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
128 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
129 ASSERT_TRUE(g_deviceB != nullptr);
130 g_syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
131 ASSERT_TRUE(g_syncInterfaceB != nullptr);
132 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, g_syncInterfaceB), E_OK);
133 SecurityOption virtualOption;
134 virtualOption.securityLabel = option.secOption.securityLabel;
135 virtualOption.securityFlag = option.secOption.securityFlag;
136 g_syncInterfaceB->SetSecurityOption(virtualOption);
137
138 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
139 ASSERT_TRUE(g_deviceC != nullptr);
140 g_syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
141 ASSERT_TRUE(g_syncInterfaceC != nullptr);
142 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, g_syncInterfaceC), E_OK);
143 g_syncInterfaceC->SetSecurityOption(virtualOption);
144 RuntimeContext::GetInstance()->ClearAllDeviceTimeInfo();
145 }
146
TearDown(void)147 void DistributedDBSingleVerP2PSyncCheckTest::TearDown(void)
148 {
149 /**
150 * @tc.teardown: Release device A, B, C
151 */
152 if (g_kvDelegatePtr != nullptr) {
153 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
154 g_kvDelegatePtr = nullptr;
155 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
156 LOGD("delete kv store status %d", status);
157 ASSERT_TRUE(status == OK);
158 }
159 if (g_deviceB != nullptr) {
160 delete g_deviceB;
161 g_deviceB = nullptr;
162 }
163 if (g_deviceC != nullptr) {
164 delete g_deviceC;
165 g_deviceC = nullptr;
166 }
167 if (g_communicatorAggregator != nullptr) {
168 g_communicatorAggregator->RegOnDispatch(nullptr);
169 }
170 }
171
172 /**
173 * @tc.name: sec option check Sync 001
174 * @tc.desc: if sec option not equal, forbid sync
175 * @tc.type: FUNC
176 * @tc.require:
177 * @tc.author: wangchuanqing
178 */
179 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck001, TestSize.Level1)
180 {
181 DBStatus status = OK;
182 std::vector<std::string> devices;
183 devices.push_back(g_deviceB->GetDeviceId());
184 devices.push_back(g_deviceC->GetDeviceId());
185
186 /**
187 * @tc.steps: step1. deviceA put {k1, v1}
188 */
189 Key key = {'1'};
190 Value value = {'1'};
191 status = g_kvDelegatePtr->Put(key, value);
192 ASSERT_TRUE(status == OK);
193
194 ASSERT_TRUE(g_syncInterfaceB != nullptr);
195 ASSERT_TRUE(g_syncInterfaceC != nullptr);
196 SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
197 g_syncInterfaceB->SetSecurityOption(secOption);
198 g_syncInterfaceC->SetSecurityOption(secOption);
199
200 /**
201 * @tc.steps: step2. deviceA call sync and wait
202 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
203 */
204 std::map<std::string, DBStatus> result;
205 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
206 ASSERT_TRUE(status == OK);
207
208 ASSERT_TRUE(result.size() == devices.size());
209 for (const auto &pair : result) {
210 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
211 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
212 }
213 VirtualDataItem item;
214 g_deviceB->GetData(key, item);
215 EXPECT_TRUE(item.value.empty());
216 g_deviceC->GetData(key, item);
217 EXPECT_TRUE(item.value.empty());
218 }
219
220 /**
221 * @tc.name: sec option check Sync 002
222 * @tc.desc: if sec option not equal, forbid sync
223 * @tc.type: FUNC
224 * @tc.require:
225 * @tc.author: wangchuanqing
226 */
227 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck002, TestSize.Level1)
228 {
229 DBStatus status = OK;
230 std::vector<std::string> devices;
231 devices.push_back(g_deviceB->GetDeviceId());
232 devices.push_back(g_deviceC->GetDeviceId());
233
234 /**
235 * @tc.steps: step1. deviceA put {k1, v1}
236 */
237 Key key = {'1'};
238 Value value = {'1'};
239 status = g_kvDelegatePtr->Put(key, value);
240 ASSERT_TRUE(status == OK);
241
242 ASSERT_TRUE(g_syncInterfaceC != nullptr);
243 SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
244 g_syncInterfaceC->SetSecurityOption(secOption);
245 secOption.securityLabel = SecurityLabel::S3;
246 secOption.securityFlag = SecurityFlag::SECE;
247 g_syncInterfaceB->SetSecurityOption(secOption);
248
249 /**
250 * @tc.steps: step2. deviceA call sync and wait
251 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
252 */
253 std::map<std::string, DBStatus> result;
254 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
255 ASSERT_TRUE(status == OK);
256
257 ASSERT_TRUE(result.size() == devices.size());
258 for (const auto &pair : result) {
259 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
260 if (pair.first == DEVICE_B) {
261 EXPECT_TRUE(pair.second == OK);
262 } else {
263 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
264 }
265 }
266 VirtualDataItem item;
267 g_deviceC->GetData(key, item);
268 EXPECT_TRUE(item.value.empty());
269 g_deviceB->GetData(key, item);
270 EXPECT_TRUE(item.value == value);
271 }
272
273 /**
274 * @tc.name: sec option check Sync 003
275 * @tc.desc: if sec option equal, check not pass, forbid sync
276 * @tc.type: FUNC
277 * @tc.require:
278 * @tc.author: zhangqiquan
279 */
280 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck003, TestSize.Level1)
281 {
282 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
283 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anonde2bd9670202(const std::string &, const SecurityOption &) 284 adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
285 return false;
286 });
287 /**
288 * @tc.steps: step1. record packet
289 * @tc.expected: step1. sync should failed in source.
290 */
291 std::atomic<int> messageCount = 0;
__anonde2bd9670302(const std::string &, Message *) 292 g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &, Message *) {
293 messageCount++;
294 });
295 /**
296 * @tc.steps: step2. deviceA call sync and wait
297 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
298 */
299 DBStatus status = OK;
300 std::vector<std::string> devices;
301 devices.push_back(g_deviceB->GetDeviceId());
302 std::map<std::string, DBStatus> result;
303 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
304 EXPECT_EQ(status, OK);
305 EXPECT_EQ(messageCount, 4); // 4 = 2 time sync + 2 ability sync
306 for (const auto &pair : result) {
307 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
308 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
309 }
310 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
311 g_communicatorAggregator->RegOnDispatch(nullptr);
312 }
313
314 /**
315 * @tc.name: sec option check Sync 004
316 * @tc.desc: memory db not check device security
317 * @tc.type: FUNC
318 * @tc.require:
319 * @tc.author: zhangqiquan
320 */
321 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck004, TestSize.Level1)
322 {
323 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
324 g_kvDelegatePtr = nullptr;
325 KvStoreNbDelegate::Option option;
326 option.secOption.securityLabel = SecurityLabel::NOT_SET;
327 option.isMemoryDb = true;
328 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
329 ASSERT_TRUE(g_kvDelegateStatus == OK);
330 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
331
332 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
333 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anonde2bd9670402(const std::string &, const SecurityOption &) 334 adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
335 return false;
336 });
__anonde2bd9670502(const std::string &, SecurityOption &securityOption) 337 adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
338 securityOption.securityLabel = NOT_SET;
339 return OK;
340 });
__anonde2bd9670602(SecurityOption &) 341 g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &) {
342 return -E_NOT_SUPPORT;
343 });
344
345 std::vector<std::string> devices;
346 devices.push_back(g_deviceB->GetDeviceId());
347 std::map<std::string, DBStatus> result;
348 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
349 EXPECT_EQ(status, OK);
350 for (const auto &pair : result) {
351 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
352 EXPECT_TRUE(pair.second == OK);
353 }
354
355 adapter->ForkCheckDeviceSecurityAbility(nullptr);
356 adapter->ForkGetSecurityOption(nullptr);
357 g_syncInterfaceB->ForkGetSecurityOption(nullptr);
358 }
359
360 /**
361 * @tc.name: sec option check Sync 005
362 * @tc.desc: if sec option equal, check not pass, forbid sync
363 * @tc.type: FUNC
364 * @tc.require:
365 * @tc.author: zhangqiquan
366 */
367 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck005, TestSize.Level1)
368 {
369 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
370 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anonde2bd9670702(SecurityOption &option) 371 g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
372 option.securityLabel = NOT_SET;
373 return E_OK;
374 });
__anonde2bd9670802(const std::string &, SecurityOption &securityOption) 375 adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
376 securityOption.securityLabel = NOT_SET;
377 return OK;
378 });
379
380 std::vector<std::string> devices;
381 devices.push_back(g_deviceB->GetDeviceId());
382 std::map<std::string, DBStatus> result;
383 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
384 EXPECT_EQ(status, OK);
385 for (const auto &pair : result) {
386 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
387 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
388 }
389
390 adapter->ForkCheckDeviceSecurityAbility(nullptr);
391 adapter->ForkGetSecurityOption(nullptr);
392 g_syncInterfaceB->ForkGetSecurityOption(nullptr);
393 }
394
395 /**
396 * @tc.name: sec option check Sync 006
397 * @tc.desc: memory db not check device security
398 * @tc.type: FUNC
399 * @tc.require:
400 * @tc.author: zhangqiquan
401 */
402 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck006, TestSize.Level0)
403 {
404 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
405 ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID), OK);
406 g_kvDelegatePtr = nullptr;
407 KvStoreNbDelegate::Option option;
408 option.secOption.securityLabel = SecurityLabel::S1;
409 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
410 ASSERT_TRUE(g_kvDelegateStatus == OK);
411 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
412
413 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
414 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anonde2bd9670902(const std::string &, const SecurityOption &) 415 adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
416 return true;
417 });
__anonde2bd9670a02(const std::string &, SecurityOption &securityOption) 418 adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
419 securityOption.securityLabel = S1;
420 return OK;
421 });
__anonde2bd9670b02(SecurityOption &option) 422 g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
423 option.securityLabel = SecurityLabel::S0;
424 return E_OK;
425 });
426
427 std::vector<std::string> devices;
428 devices.push_back(g_deviceB->GetDeviceId());
429 std::map<std::string, DBStatus> result;
430 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
431 EXPECT_EQ(status, OK);
432 for (const auto &pair : result) {
433 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
434 EXPECT_TRUE(pair.second == OK);
435 }
436
437 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(std::make_shared<ProcessSystemApiAdapterImpl>());
438 g_syncInterfaceB->ForkGetSecurityOption(nullptr);
439 }
440
441 /**
442 * @tc.name: sec option check Sync 007
443 * @tc.desc: sync should send security option
444 * @tc.type: FUNC
445 * @tc.require:
446 * @tc.author: zhangqiquan
447 */
448 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck007, TestSize.Level0)
449 {
450 /**
451 * @tc.steps: step1. fork check device security ability
452 * @tc.expected: step1. check param option should be S3 SECE.
453 */
454 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
455 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anonde2bd9670c02(const std::string &, const SecurityOption &option) 456 adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &option) {
457 EXPECT_EQ(option.securityLabel, SecurityLabel::S3);
458 EXPECT_EQ(option.securityFlag, SecurityFlag::SECE);
459 return true;
460 });
461 /**
462 * @tc.steps: step2. sync twice
463 * @tc.expected: step2. sync success.
464 */
465 std::vector<std::string> devices;
466 devices.push_back(g_deviceB->GetDeviceId());
467 std::map<std::string, DBStatus> result;
468 g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
469 auto status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
470 ASSERT_TRUE(status == OK);
471 ASSERT_TRUE(result.size() == devices.size());
472 for (const auto &pair : result) {
473 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
474 EXPECT_TRUE(pair.second == OK);
475 }
476 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
477 }
478
479 /**
480 * @tc.name: SecOptionCheck008
481 * @tc.desc: pull compress sync when check device ability fail
482 * @tc.type: FUNC
483 * @tc.require:
484 * @tc.author: zhangqiquan
485 */
486 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck008, TestSize.Level0)
487 {
488 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
489 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
490 auto deviceB = g_deviceB->GetDeviceId();
__anonde2bd9670d02(const std::string &dev, const SecurityOption &) 491 adapter->ForkCheckDeviceSecurityAbility([deviceB](const std::string &dev, const SecurityOption &) {
492 if (dev != "real_device") {
493 return true;
494 }
495 return false;
496 });
__anonde2bd9670e02(SecurityOption &option) 497 g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
498 option.securityLabel = SecurityLabel::S3;
499 option.securityFlag = SecurityFlag::SECE;
500 return E_OK;
501 });
502 g_syncInterfaceB->SetCompressSync(true);
503 std::vector<std::string> devices;
504 devices.push_back(deviceB);
505 std::map<std::string, DBStatus> result;
506 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
507 EXPECT_EQ(status, OK);
508 for (const auto &pair : result) {
509 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
510 EXPECT_EQ(pair.second, SECURITY_OPTION_CHECK_ERROR);
511 }
512
513 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(std::make_shared<ProcessSystemApiAdapterImpl>());
514 g_syncInterfaceB->ForkGetSecurityOption(nullptr);
515 g_syncInterfaceB->SetCompressSync(false);
516 }
517
518 /**
519 * @tc.name: SyncProcess001
520 * @tc.desc: sync process pull mode.
521 * @tc.type: FUNC
522 * @tc.require:
523 * @tc.author: chenghuitao
524 */
525 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncProcess001, TestSize.Level1)
526 {
527 std::vector<std::string> devices;
528 devices.push_back(g_deviceB->GetDeviceId());
529 devices.push_back(g_deviceC->GetDeviceId());
530
531 /**
532 * @tc.steps: step1. deviceB deviceC put bigData
533 */
534 std::vector<Entry> entries;
535 const int dataCount = 10;
536 DistributedDBUnitTest::GenerateNumberEntryVector(dataCount, entries);
537
538 for (uint32_t i = 0; i < entries.size(); i++) {
539 if (i % 2 == 0) {
540 g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
541 } else {
542 g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
543 }
544 }
545
546 /**
547 * @tc.steps: step2. deviceA call pull sync
548 * @tc.expected: step2. sync should return OK.
549 */
550 std::map<std::string, DeviceSyncProcess> syncProcessMap;
551 DeviceSyncOption option;
552 option.devices = devices;
553 option.mode = SYNC_MODE_PULL_ONLY;
554 option.isQuery = false;
555 option.isWait = false;
556 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, option, syncProcessMap);
557 EXPECT_EQ(status, DBStatus::OK);
558
559 /**
560 * @tc.expected: step3. onProcess should be called, DeviceA have all bigData
561 */
562 for (const auto &entry : entries) {
563 Value value;
564 EXPECT_EQ(g_kvDelegatePtr->Get(entry.key, value), DBStatus::OK);
565 EXPECT_EQ(value, entry.value);
566 }
567
568 for (const auto &entry : syncProcessMap) {
569 LOGD("[SyncProcess001] dev %s, status %d, totalCount %u, finishedCount %u", entry.first.c_str(),
570 entry.second.errCode, entry.second.pullInfo.total, entry.second.pullInfo.finishedCount);
571 EXPECT_EQ(entry.second.errCode, OK);
572 EXPECT_EQ(entry.second.process, ProcessStatus::FINISHED);
573 EXPECT_EQ(entry.second.pullInfo.total, static_cast<uint32_t>(dataCount / 2));
574 EXPECT_EQ(entry.second.pullInfo.finishedCount, static_cast<uint32_t>(dataCount / 2));
575 ASSERT_TRUE(entry.second.syncId > 0);
576 }
577 }
578
579 /**
580 * @tc.name: SyncProcess002
581 * @tc.desc: sync process pull mode.
582 * @tc.type: FUNC
583 * @tc.require:
584 * @tc.author: chenghuitao
585 */
586 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncProcess002, TestSize.Level1)
587 {
588 /**
589 * @tc.steps: step1. deviceA put bigData
590 */
591 std::vector<Entry> entries;
592 const int dataCount = 10;
593 DistributedDBUnitTest::GenerateNumberEntryVector(dataCount, entries);
594
595 for (uint32_t i = 0; i < entries.size(); i++) {
596 g_kvDelegatePtr->Put(entries[i].key, entries[i].value);
597 }
598
599 /**
600 * @tc.steps: step2. virtual deviceB call pull sync
601 * @tc.expected: step2. sync should return OK.
602 */
603 std::map<std::string, DeviceSyncProcess> syncProcessMap;
604 DeviceSyncOption option;
605 option.mode = SYNC_MODE_PULL_ONLY;
606 option.isQuery = false;
607 option.isWait = true;
608 uint32_t processCount = 0;
609 std::vector<ProcessStatus> statuses = {ProcessStatus::PREPARED, ProcessStatus::PROCESSING, ProcessStatus::FINISHED};
610 DeviceSyncProcessCallback onProcess =
__anonde2bd9670f02(const std::map<std::string, DeviceSyncProcess> &processMap) 611 [&](const std::map<std::string, DeviceSyncProcess> &processMap) {
612 syncProcessMap = processMap;
613 for (const auto &entry : processMap) {
614 LOGD("[SyncProcess002-onProcess] dev %s, status %d, process %d", entry.first.c_str(),
615 entry.second.errCode, entry.second.process);
616 EXPECT_EQ(entry.second.errCode, DBStatus::OK);
617 EXPECT_EQ(entry.second.process, statuses[processCount]);
618 // total and finishedCount must be greater than 0 when processing
619 if (entry.second.process == ProcessStatus::PROCESSING) {
620 EXPECT_TRUE(entry.second.pullInfo.total > 0);
621 EXPECT_TRUE(entry.second.pullInfo.finishedCount > 0);
622 }
623 }
624 processCount++;
625 };
626 int status = g_deviceB->Sync(option, onProcess);
627 EXPECT_EQ(status, E_OK);
628
629 /**
630 * @tc.expected: step3. onProcess should be called, DeviceB have all bigData
631 */
632 for (const auto &entry : entries) {
633 VirtualDataItem item;
634 EXPECT_EQ(g_deviceB->GetData(entry.key, item), E_OK);
635 EXPECT_EQ(item.value, entry.value);
636 }
637
638 for (const auto &entry : syncProcessMap) {
639 LOGD("[SyncProcess002] dev %s, status %d, totalCount %u, finishedCount %u", entry.first.c_str(),
640 entry.second.errCode, entry.second.pullInfo.total, entry.second.pullInfo.finishedCount);
641 EXPECT_EQ(entry.second.errCode, DBStatus::OK);
642 EXPECT_EQ(entry.second.process, ProcessStatus::FINISHED);
643 EXPECT_EQ(entry.second.pullInfo.total, static_cast<uint32_t>(dataCount));
644 EXPECT_EQ(entry.second.pullInfo.finishedCount, static_cast<uint32_t>(dataCount));
645 ASSERT_TRUE(entry.second.syncId > 0);
646 }
647 }
648
649 /**
650 * @tc.name: SyncProcess003
651 * @tc.desc: sync process pull mode with QUERY.
652 * @tc.type: FUNC
653 * @tc.require:
654 * @tc.author: chenghuitao
655 */
656 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncProcess003, TestSize.Level1)
657 {
658 /**
659 * @tc.steps: step1. deviceA put bigData
660 */
661 std::vector<Entry> entries;
662 const int dataCount = 10;
663 DistributedDBUnitTest::GenerateNumberEntryVector(dataCount, entries);
664
665 for (uint32_t i = 0; i < entries.size(); i++) {
666 g_kvDelegatePtr->Put(entries[i].key, entries[i].value);
667 }
668
669 /**
670 * @tc.steps: step2. virtual deviceB call pull sync
671 * @tc.expected: step2. sync should return OK.
672 */
673 std::map<std::string, DeviceSyncProcess> syncProcessMap;
674 DeviceSyncOption option;
675 option.mode = SYNC_MODE_PULL_ONLY;
676 option.isQuery = true;
677 option.isWait = true;
678 option.query = Query::Select().Limit(5);
679 DeviceSyncProcessCallback onProcess =
__anonde2bd9671002(const std::map<std::string, DeviceSyncProcess> &processMap) 680 [&syncProcessMap, this](const std::map<std::string, DeviceSyncProcess> &processMap) {
681 syncProcessMap = processMap;
682 };
683 int status = g_deviceB->Sync(option, onProcess);
684 EXPECT_EQ(status, E_OK);
685
686 /**
687 * @tc.expected: step3. onProcess should be called, DeviceB have all bigData
688 */
689 for (const auto &entry : std::vector<Entry>(entries.begin(), entries.begin() + 5)) {
690 VirtualDataItem item;
691 EXPECT_EQ(g_deviceB->GetData(entry.key, item), E_OK);
692 EXPECT_EQ(item.value, entry.value);
693 }
694
695 for (const auto &entry : syncProcessMap) {
696 LOGD("[SyncProcess003] dev %s, status %d, totalCount %u, finishedCount %u", entry.first.c_str(),
697 entry.second.errCode, entry.second.pullInfo.total, entry.second.pullInfo.finishedCount);
698 EXPECT_EQ(entry.second.errCode, DBStatus::OK);
699 EXPECT_EQ(entry.second.process, ProcessStatus::FINISHED);
700 EXPECT_EQ(entry.second.pullInfo.total, 5u);
701 EXPECT_EQ(entry.second.pullInfo.finishedCount, 5u);
702 ASSERT_TRUE(entry.second.syncId > 0);
703 }
704 }
705
706 #ifndef LOW_LEVEL_MEM_DEV
707 /**
708 * @tc.name: BigDataSync001
709 * @tc.desc: big data sync push mode.
710 * @tc.type: FUNC
711 * @tc.require:
712 * @tc.author: wangchuanqing
713 */
714 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync001, TestSize.Level1)
715 {
716 DBStatus status = OK;
717 std::vector<std::string> devices;
718 devices.push_back(g_deviceB->GetDeviceId());
719 devices.push_back(g_deviceC->GetDeviceId());
720
721 /**
722 * @tc.steps: step1. deviceA put 16 bigData
723 */
724 std::vector<Entry> entries;
725 std::vector<Key> keys;
726 DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
727 for (const auto &entry : entries) {
728 status = g_kvDelegatePtr->Put(entry.key, entry.value);
729 ASSERT_TRUE(status == OK);
730 }
731
732 /**
733 * @tc.steps: step2. deviceA call sync and wait
734 * @tc.expected: step2. sync should return OK.
735 */
736 std::map<std::string, DBStatus> result;
737 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
738 ASSERT_TRUE(status == OK);
739
740 /**
741 * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
742 */
743 ASSERT_TRUE(result.size() == devices.size());
744 for (const auto &pair : result) {
745 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
746 EXPECT_TRUE(pair.second == OK);
747 }
748 VirtualDataItem item;
749 for (const auto &entry : entries) {
750 item.value.clear();
751 g_deviceB->GetData(entry.key, item);
752 EXPECT_TRUE(item.value == entry.value);
753 item.value.clear();
754 g_deviceC->GetData(entry.key, item);
755 EXPECT_TRUE(item.value == entry.value);
756 }
757 }
758
759 /**
760 * @tc.name: BigDataSync002
761 * @tc.desc: big data sync pull mode.
762 * @tc.type: FUNC
763 * @tc.require:
764 * @tc.author: wangchuanqing
765 */
766 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync002, TestSize.Level1)
767 {
768 DBStatus status = OK;
769 std::vector<std::string> devices;
770 devices.push_back(g_deviceB->GetDeviceId());
771 devices.push_back(g_deviceC->GetDeviceId());
772
773 /**
774 * @tc.steps: step1. deviceA deviceB put bigData
775 */
776 std::vector<Entry> entries;
777 std::vector<Key> keys;
778 DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
779
780 for (uint32_t i = 0; i < entries.size(); i++) {
781 if (i % 2 == 0) {
782 g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
783 } else {
784 g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
785 }
786 }
787
788 /**
789 * @tc.steps: step3. deviceA call pull sync
790 * @tc.expected: step3. sync should return OK.
791 */
792 std::map<std::string, DBStatus> result;
793 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
794 ASSERT_TRUE(status == OK);
795
796 /**
797 * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
798 */
799 ASSERT_TRUE(result.size() == devices.size());
800 for (const auto &pair : result) {
801 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
802 EXPECT_TRUE(pair.second == OK);
803 }
804 for (const auto &entry : entries) {
805 Value value;
806 EXPECT_EQ(g_kvDelegatePtr->Get(entry.key, value), OK);
807 EXPECT_EQ(value, entry.value);
808 }
809 }
810
811 /**
812 * @tc.name: BigDataSync003
813 * @tc.desc: big data sync pushAndPull mode.
814 * @tc.type: FUNC
815 * @tc.require:
816 * @tc.author: wangchuanqing
817 */
818 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync003, TestSize.Level1)
819 {
820 DBStatus status = OK;
821 std::vector<std::string> devices;
822 devices.push_back(g_deviceB->GetDeviceId());
823 devices.push_back(g_deviceC->GetDeviceId());
824
825 /**
826 * @tc.steps: step1. deviceA deviceB put bigData
827 */
828 std::vector<Entry> entries;
829 std::vector<Key> keys;
830 DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
831
832 for (uint32_t i = 0; i < entries.size(); i++) {
833 if (i % 3 == 0) { // 0 3 6 9 12 15 for deivec B
834 g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
835 } else if (i % 3 == 1) { // 1 4 7 10 13 16 for device C
836 g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
837 } else { // 2 5 8 11 14 for device A
838 status = g_kvDelegatePtr->Put(entries[i].key, entries[i].value);
839 ASSERT_TRUE(status == OK);
840 }
841 }
842
843 /**
844 * @tc.steps: step3. deviceA call pushAndpull sync
845 * @tc.expected: step3. sync should return OK.
846 */
847 std::map<std::string, DBStatus> result;
848 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
849 ASSERT_TRUE(status == OK);
850
851 /**
852 * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
853 * deviceB and deviceC has deviceA data
854 */
855 ASSERT_TRUE(result.size() == devices.size());
856 for (const auto &pair : result) {
857 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
858 EXPECT_TRUE(pair.second == OK);
859 }
860
861 VirtualDataItem item;
862 for (uint32_t i = 0; i < entries.size(); i++) {
863 Value value;
864 EXPECT_EQ(g_kvDelegatePtr->Get(entries[i].key, value), OK);
865 EXPECT_EQ(value, entries[i].value);
866
867 if (i % 3 == 2) { // 2 5 8 11 14 for device A
868 item.value.clear();
869 g_deviceB->GetData(entries[i].key, item);
870 EXPECT_TRUE(item.value == entries[i].value);
871 item.value.clear();
872 g_deviceC->GetData(entries[i].key, item);
873 EXPECT_TRUE(item.value == entries[i].value);
874 }
875 }
876 }
877 #endif
878
CancelTestInit(DeviceSyncOption & option,std::vector<Entry> & entries,const uint32_t mtuSize)879 void DistributedDBSingleVerP2PSyncCheckTest::CancelTestInit(DeviceSyncOption &option, std::vector<Entry> &entries,
880 const uint32_t mtuSize)
881 {
882 option.devices.push_back(g_deviceB->GetDeviceId());
883 option.devices.push_back(g_deviceC->GetDeviceId());
884 option.mode = SYNC_MODE_PULL_ONLY;
885 option.isQuery = false;
886 option.isWait = false;
887
888 std::vector<Key> keys;
889 const uint32_t entriesSize = 14000u;
890 const int keySize = 20;
891 DistributedDBUnitTest::GenerateRecords(entriesSize, entries, keys, keySize, mtuSize);
892 for (uint32_t i = 0; i < entries.size(); i++) {
893 if (i % option.devices.size() == 0) {
894 g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
895 } else {
896 g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
897 }
898 }
899
900 g_communicatorAggregator->SetDeviceMtuSize("real_device", mtuSize);
901 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_C, mtuSize);
902 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, mtuSize);
903 }
904
CancelTestEnd(std::vector<Entry> & entries,const uint32_t mtuSize)905 void DistributedDBSingleVerP2PSyncCheckTest::CancelTestEnd(std::vector<Entry> &entries, const uint32_t mtuSize)
906 {
907 size_t syncSuccCount = 0;
908 for (uint32_t i = 0; i < entries.size(); i++) {
909 Value value;
910 if (g_kvDelegatePtr->Get(entries[i].key, value) == OK) {
911 syncSuccCount++;
912 EXPECT_EQ(value, entries[i].value);
913 }
914 }
915 EXPECT_GT(syncSuccCount, static_cast<size_t>(0));
916 EXPECT_LT(syncSuccCount, entries.size());
917 uint32_t mtu = 5u;
918 g_communicatorAggregator->SetDeviceMtuSize("real_device", mtu * mtuSize * mtuSize);
919 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_C, mtu * mtuSize * mtuSize);
920 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, mtu * mtuSize * mtuSize);
921 g_communicatorAggregator->RegBeforeDispatch(nullptr);
922 }
923
924 /**
925 * @tc.name: CancelSyncProcess001
926 * @tc.desc: cancel data sync process pull mode.
927 * @tc.type: FUNC
928 * @tc.require:
929 * @tc.author: lijun
930 */
931 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncProcessCancel001, TestSize.Level0)
932 {
933 DeviceSyncOption option;
934 std::vector<Entry> entries;
935 const uint32_t mtuSize = 8u;
936 /**
937 * @tc.steps: step1. deviceB deviceC put data
938 */
939 CancelTestInit(option, entries, mtuSize);
940 uint32_t syncId;
941 std::mutex tempMutex;
942 bool isFirst = true;
__anonde2bd9671102(const std::string &dstTarget, const Message *msg) 943 g_communicatorAggregator->RegBeforeDispatch([&](const std::string &dstTarget, const Message *msg) {
944 if (dstTarget == "real_device" && msg->GetMessageType() == TYPE_REQUEST &&
945 msg->GetMessageId() == DATA_SYNC_MESSAGE) {
946 tempMutex.lock();
947 if (isFirst == true) {
948 isFirst = false;
949 /**
950 * @tc.steps: step3. cancel sync
951 * @tc.expected: step3. should return OK.
952 */
953 ASSERT_TRUE(g_kvDelegatePtr->CancelSync(syncId) == OK);
954 tempMutex.unlock();
955 std::this_thread::sleep_for(std::chrono::seconds(1));
956 return;
957 }
958 tempMutex.unlock();
959 }
960 });
961
962 std::mutex cancelMtx;
963 std::condition_variable cancelCv;
964 bool cancalFinished = false;
965
__anonde2bd9671202(const std::map<std::string, DeviceSyncProcess> &processMap) 966 DeviceSyncProcessCallback onProcess = [&](const std::map<std::string, DeviceSyncProcess> &processMap) {
967 bool isAllCancel = true;
968 for (auto &process: processMap) {
969 syncId = process.second.syncId;
970 if (process.second.errCode != COMM_FAILURE) {
971 isAllCancel = false;
972 }
973 }
974 if (isAllCancel) {
975 std::unique_lock<std::mutex> lock(cancelMtx);
976 cancalFinished = true;
977 cancelCv.notify_all();
978 }
979 };
980 /**
981 * @tc.steps: step2. deviceA call pull sync
982 * @tc.expected: step2. sync should return OK.
983 */
984 ASSERT_TRUE(g_kvDelegatePtr->Sync(option, onProcess) == OK);
985
986 // Wait onProcess complete.
987 {
988 std::unique_lock<std::mutex> lock2(cancelMtx);
__anonde2bd9671302() 989 cancelCv.wait(lock2, [&cancalFinished]() {return cancalFinished;});
990 }
991 // Wait until all the packets arrive.
992 std::this_thread::sleep_for(std::chrono::seconds(2));
993
994 /**
995 * @tc.steps: step4. Cancel abnormal syncId.
996 * @tc.expected: step4. return NOT_FOUND.
997 */
998 ASSERT_TRUE(g_kvDelegatePtr->CancelSync(syncId) == NOT_FOUND);
999 ASSERT_TRUE(g_kvDelegatePtr->CancelSync(0) == NOT_FOUND);
1000 ASSERT_TRUE(g_kvDelegatePtr->CancelSync(4294967295) == NOT_FOUND); // uint32_t max value 4294967295
1001 CancelTestEnd(entries, mtuSize);
1002 }
1003
1004 /**
1005 * @tc.name: PushFinishedNotify 001
1006 * @tc.desc: Test remote device push finished notify function.
1007 * @tc.type: FUNC
1008 * @tc.require:
1009 * @tc.author: xushaohua
1010 */
1011 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, PushFinishedNotify001, TestSize.Level1)
1012 {
1013 std::vector<std::string> devices;
1014 devices.push_back(g_deviceB->GetDeviceId());
1015
1016 /**
1017 * @tc.steps: step1. deviceA call SetRemotePushFinishedNotify
1018 * @tc.expected: step1. set should return OK.
1019 */
1020 int pushfinishedFlag = 0;
1021 DBStatus status = g_kvDelegatePtr->SetRemotePushFinishedNotify(
__anonde2bd9671402(const RemotePushNotifyInfo &info) 1022 [&pushfinishedFlag](const RemotePushNotifyInfo &info) {
1023 EXPECT_TRUE(info.deviceId == DEVICE_B);
1024 pushfinishedFlag = 1;
1025 });
1026 ASSERT_EQ(status, OK);
1027
1028 /**
1029 * @tc.steps: step2. deviceB put k2, v2, and deviceA pull from deviceB
1030 * @tc.expected: step2. deviceA can not receive push finished notify
1031 */
1032 EXPECT_EQ(g_kvDelegatePtr->Put(KEY_2, VALUE_2), OK);
1033 std::map<std::string, DBStatus> result;
1034 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
1035 EXPECT_TRUE(status == OK);
1036 EXPECT_EQ(pushfinishedFlag, 0);
1037 pushfinishedFlag = 0;
1038
1039 /**
1040 * @tc.steps: step3. deviceB put k3, v3, and deviceA push and pull to deviceB
1041 * @tc.expected: step3. deviceA can not receive push finished notify
1042 */
1043 EXPECT_EQ(g_kvDelegatePtr->Put(KEY_3, VALUE_3), OK);
1044 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
1045 EXPECT_TRUE(status == OK);
1046 EXPECT_EQ(pushfinishedFlag, 0);
1047 pushfinishedFlag = 0;
1048
1049 /**
1050 * @tc.steps: step4. deviceA call SetRemotePushFinishedNotify to reset notify
1051 * @tc.expected: step4. set should return OK.
1052 */
__anonde2bd9671502(const RemotePushNotifyInfo &info) 1053 status = g_kvDelegatePtr->SetRemotePushFinishedNotify([&pushfinishedFlag](const RemotePushNotifyInfo &info) {
1054 EXPECT_TRUE(info.deviceId == DEVICE_B);
1055 pushfinishedFlag = 2;
1056 });
1057 ASSERT_EQ(status, OK);
1058
1059 /**
1060 * @tc.steps: step5. deviceA call SetRemotePushFinishedNotify set null to unregist
1061 * @tc.expected: step5. set should return OK.
1062 */
1063 status = g_kvDelegatePtr->SetRemotePushFinishedNotify(nullptr);
1064 ASSERT_EQ(status, OK);
1065 }
1066
1067 namespace {
RegOnDispatchWithDelayAck(bool & errCodeAck,bool & afterErrAck)1068 void RegOnDispatchWithDelayAck(bool &errCodeAck, bool &afterErrAck)
1069 {
1070 // just delay the busy ack
1071 g_communicatorAggregator->RegOnDispatch([&errCodeAck, &afterErrAck](const std::string &dev, Message *inMsg) {
1072 if (dev != g_deviceB->GetDeviceId()) {
1073 return;
1074 }
1075 auto *packet = inMsg->GetObject<DataAckPacket>();
1076 if (packet != nullptr && packet->GetRecvCode() == -E_BUSY) {
1077 errCodeAck = true;
1078 while (!afterErrAck) {
1079 }
1080 LOGW("NOW SEND BUSY ACK");
1081 } else if (errCodeAck) {
1082 afterErrAck = true;
1083 std::this_thread::sleep_for(std::chrono::seconds(1));
1084 }
1085 });
1086 }
1087
RegOnDispatchWithOffline(bool & offlineFlag,bool & invalid,condition_variable & conditionOffline)1088 void RegOnDispatchWithOffline(bool &offlineFlag, bool &invalid, condition_variable &conditionOffline)
1089 {
1090 g_communicatorAggregator->RegOnDispatch([&offlineFlag, &invalid, &conditionOffline](
1091 const std::string &dev, Message *inMsg) {
1092 auto *packet = inMsg->GetObject<DataAckPacket>();
1093 if (dev != DEVICE_B) {
1094 if (packet != nullptr && (packet->GetRecvCode() == LOCAL_WATER_MARK_NOT_INIT)) {
1095 offlineFlag = true;
1096 conditionOffline.notify_all();
1097 LOGW("[Dispatch] NOTIFY OFFLINE");
1098 std::this_thread::sleep_for(std::chrono::microseconds(EIGHT_HUNDRED));
1099 }
1100 } else if (!invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
1101 LOGW("[Dispatch] NOW INVALID THIS MSG");
1102 inMsg->SetMessageType(TYPE_INVALID);
1103 inMsg->SetMessageId(INVALID_MESSAGE_ID);
1104 invalid = true;
1105 }
1106 });
1107 }
1108
RegOnDispatchWithInvalidMsg(bool & invalid)1109 void RegOnDispatchWithInvalidMsg(bool &invalid)
1110 {
1111 g_communicatorAggregator->RegOnDispatch([&invalid](
1112 const std::string &dev, Message *inMsg) {
1113 if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
1114 LOGW("[Dispatch] NOW INVALID THIS MSG");
1115 inMsg->SetMessageType(TYPE_INVALID);
1116 inMsg->SetMessageId(INVALID_MESSAGE_ID);
1117 invalid = true;
1118 }
1119 });
1120 }
1121
PrepareEnv(vector<std::string> & devices,Key & key,Query & query)1122 void PrepareEnv(vector<std::string> &devices, Key &key, Query &query)
1123 {
1124 /**
1125 * @tc.steps: step1. ensure the watermark is no zero and finish timeSync and abilitySync
1126 * @tc.expected: step1. should return OK.
1127 */
1128 Value value = {'1'};
1129 std::map<std::string, DBStatus> result;
1130 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1131
1132 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
1133 EXPECT_TRUE(status == OK);
1134 ASSERT_TRUE(result[g_deviceB->GetDeviceId()] == OK);
1135 }
1136
Sync(KvStoreNbDelegate * kvDelegatePtr,vector<std::string> & devices,const DBStatus & targetStatus)1137 void Sync(KvStoreNbDelegate *kvDelegatePtr, vector<std::string> &devices, const DBStatus &targetStatus)
1138 {
1139 std::map<std::string, DBStatus> result;
1140 DBStatus status = g_tool.SyncTest(kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result);
1141 EXPECT_TRUE(status == OK);
1142 for (const auto &deviceId : devices) {
1143 ASSERT_TRUE(result[deviceId] == targetStatus);
1144 }
1145 }
1146
Sync(vector<std::string> & devices,const DBStatus & targetStatus)1147 void Sync(vector<std::string> &devices, const DBStatus &targetStatus)
1148 {
1149 Sync(g_kvDelegatePtr, devices, targetStatus);
1150 }
1151
SyncWithQuery(vector<std::string> & devices,const Query & query,const SyncMode & mode,const DBStatus & targetStatus)1152 void SyncWithQuery(vector<std::string> &devices, const Query &query, const SyncMode &mode,
1153 const DBStatus &targetStatus)
1154 {
1155 std::map<std::string, DBStatus> result;
1156 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result, query);
1157 EXPECT_TRUE(status == OK);
1158 for (const auto &deviceId : devices) {
1159 if (targetStatus == COMM_FAILURE) {
1160 // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
1161 // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
1162 // The returned status is COMM_FAILURE.
1163 // If syncTaskContext of deviceB is not executed first, the error code is transparently transmitted.
1164 EXPECT_TRUE((result[deviceId] == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
1165 (result[deviceId] == COMM_FAILURE));
1166 } else {
1167 ASSERT_EQ(result[deviceId], targetStatus);
1168 }
1169 }
1170 }
1171
SyncWithQuery(vector<std::string> & devices,const Query & query,const DBStatus & targetStatus)1172 void SyncWithQuery(vector<std::string> &devices, const Query &query, const DBStatus &targetStatus)
1173 {
1174 SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PUSH_ONLY, targetStatus);
1175 }
1176
SyncWithDeviceOffline(vector<std::string> & devices,Key & key,const Query & query)1177 void SyncWithDeviceOffline(vector<std::string> &devices, Key &key, const Query &query)
1178 {
1179 Value value = {'2'};
1180 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1181
1182 /**
1183 * @tc.steps: step2. invalid the sync msg
1184 * @tc.expected: step2. should return TIME_OUT.
1185 */
1186 SyncWithQuery(devices, query, TIME_OUT);
1187
1188 /**
1189 * @tc.steps: step3. device offline when sync
1190 * @tc.expected: step3. should return COMM_FAILURE.
1191 */
1192 SyncWithQuery(devices, query, COMM_FAILURE);
1193 }
1194
PrepareWaterMarkError(std::vector<std::string> & devices,Query & query)1195 void PrepareWaterMarkError(std::vector<std::string> &devices, Query &query)
1196 {
1197 /**
1198 * @tc.steps: step1. prepare data
1199 */
1200 devices.push_back(g_deviceB->GetDeviceId());
1201 g_deviceB->Online();
1202
1203 Key key = {'1'};
1204 query = Query::Select().PrefixKey(key);
1205 PrepareEnv(devices, key, query);
1206
1207 /**
1208 * @tc.steps: step2. query sync and set queryWaterMark
1209 * @tc.expected: step2. should return OK.
1210 */
1211 Value value = {'2'};
1212 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1213 SyncWithQuery(devices, query, OK);
1214
1215 /**
1216 * @tc.steps: step3. sync and invalid msg for set local device waterMark
1217 * @tc.expected: step3. should return TIME_OUT.
1218 */
1219 bool invalidMsg = false;
1220 RegOnDispatchWithInvalidMsg(invalidMsg);
1221 value = {'3'};
1222 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1223 Sync(devices, TIME_OUT);
1224 g_communicatorAggregator->RegOnDispatch(nullptr);
1225 }
1226
RegOnDispatchWithoutDataPacket(std::atomic<int> & messageCount,bool calResponse=false)1227 void RegOnDispatchWithoutDataPacket(std::atomic<int> &messageCount, bool calResponse = false)
1228 {
1229 g_communicatorAggregator->RegOnDispatch([calResponse, &messageCount](const std::string &dev, Message *msg) {
1230 if (msg->GetMessageId() != TIME_SYNC_MESSAGE && msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
1231 return;
1232 }
1233 if (dev != DEVICE_B || (!calResponse && msg->GetMessageType() != TYPE_REQUEST)) {
1234 return;
1235 }
1236 messageCount++;
1237 });
1238 }
1239
ReOpenDB()1240 void ReOpenDB()
1241 {
1242 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1243 g_kvDelegatePtr = nullptr;
1244 KvStoreNbDelegate::Option option;
1245 option.secOption.securityLabel = SecurityLabel::S3;
1246 option.secOption.securityFlag = SecurityFlag::SECE;
1247 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1248 ASSERT_TRUE(g_kvDelegateStatus == OK);
1249 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1250 }
1251 }
1252
1253 /**
1254 * @tc.name: AckSessionCheck 001
1255 * @tc.desc: Test ack session check function.
1256 * @tc.type: FUNC
1257 * @tc.require:
1258 * @tc.author: zhangqiquan
1259 */
1260 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSessionCheck001, TestSize.Level3)
1261 {
1262 std::vector<std::string> devices;
1263 devices.push_back(g_deviceB->GetDeviceId());
1264
1265 /**
1266 * @tc.steps: step1. deviceB sync to deviceA just for timeSync and abilitySync
1267 * @tc.expected: step1. should return OK.
1268 */
1269 ASSERT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == E_OK);
1270
1271 /**
1272 * @tc.steps: step2. deviceA StartTransaction for prevent other sync action deviceB sync will fail
1273 * @tc.expected: step2. should return OK.
1274 */
1275 ASSERT_TRUE(g_kvDelegatePtr->StartTransaction() == OK);
1276
1277 bool errCodeAck = false;
1278 bool afterErrAck = false;
1279 RegOnDispatchWithDelayAck(errCodeAck, afterErrAck);
1280
1281 Key key = {'1'};
1282 Value value = {'1'};
1283 Timestamp currentTime;
1284 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1285 EXPECT_TRUE(g_deviceB->PutData(key, value, currentTime, 0) == E_OK);
1286 EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == E_OK);
1287
1288 Value outValue;
1289 EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == NOT_FOUND);
1290
1291 /**
1292 * @tc.steps: step3. release the writeHandle and try again, sync success
1293 * @tc.expected: step3. should return OK.
1294 */
1295 EXPECT_TRUE(g_kvDelegatePtr->Commit() == OK);
1296 EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == E_OK);
1297
1298 EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == OK);
1299 EXPECT_EQ(outValue, value);
1300 }
1301
1302 /**
1303 * @tc.name: AckSafeCheck001
1304 * @tc.desc: Test ack session check filter all bad ack in device offline scene.
1305 * @tc.type: FUNC
1306 * @tc.require:
1307 * @tc.author: zhangqiquan
1308 */
1309 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSafeCheck001, TestSize.Level3)
1310 {
1311 std::vector<std::string> devices;
1312 devices.push_back(g_deviceB->GetDeviceId());
1313 g_deviceB->Online();
1314
1315 Key key = {'1'};
1316 Query query = Query::Select().PrefixKey(key);
1317 PrepareEnv(devices, key, query);
1318
1319 std::condition_variable conditionOnline;
1320 std::condition_variable conditionOffline;
1321 bool onlineFlag = false;
1322 bool invalid = false;
1323 bool offlineFlag = false;
__anonde2bd9671b02() 1324 thread subThread([&onlineFlag, &conditionOnline, &offlineFlag, &conditionOffline]() {
1325 LOGW("[Dispatch] NOW DEVICES IS OFFLINE");
1326 std::mutex offlineMtx;
1327 std::unique_lock<std::mutex> lck(offlineMtx);
1328 conditionOffline.wait(lck, [&offlineFlag]{ return offlineFlag; });
1329 g_deviceB->Offline();
1330 std::this_thread::sleep_for(std::chrono::seconds(1));
1331 g_deviceB->Online();
1332 onlineFlag = true;
1333 conditionOnline.notify_all();
1334 LOGW("[Dispatch] NOW DEVICES IS ONLINE");
1335 });
1336 subThread.detach();
1337
1338 RegOnDispatchWithOffline(offlineFlag, invalid, conditionOffline);
1339
1340 SyncWithDeviceOffline(devices, key, query);
1341
1342 std::mutex onlineMtx;
1343 std::unique_lock<std::mutex> lck(onlineMtx);
__anonde2bd9671d02null1344 conditionOnline.wait(lck, [&onlineFlag]{ return onlineFlag; });
1345
1346 /**
1347 * @tc.steps: step4. sync again if has problem it will sync never end
1348 * @tc.expected: step4. should return OK.
1349 */
1350 SyncWithQuery(devices, query, OK);
1351 }
1352
1353 /**
1354 * @tc.name: WaterMarkCheck001
1355 * @tc.desc: Test waterMark work correct in lost package scene.
1356 * @tc.type: FUNC
1357 * @tc.require:
1358 * @tc.author: zhangqiquan
1359 */
1360 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck001, TestSize.Level1)
1361 {
1362 std::vector<std::string> devices;
1363 Query query = Query::Select();
1364 PrepareWaterMarkError(devices, query);
1365
1366 /**
1367 * @tc.steps: step4. sync again see it work correct
1368 * @tc.expected: step4. should return OK.
1369 */
1370 SyncWithQuery(devices, query, OK);
1371 }
1372
1373 /**
1374 * @tc.name: WaterMarkCheck002
1375 * @tc.desc: Test pull work correct in error waterMark scene.
1376 * @tc.type: FUNC
1377 * @tc.require:
1378 * @tc.author: zhangqiquan
1379 */
1380 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck002, TestSize.Level1)
1381 {
1382 std::vector<std::string> devices;
1383 Query query = Query::Select();
1384 PrepareWaterMarkError(devices, query);
1385
1386 /**
1387 * @tc.steps: step4. sync again see it work correct
1388 * @tc.expected: step4. should return OK.
1389 */
1390 Key key = {'2'};
1391 ASSERT_TRUE(g_kvDelegatePtr->Put(key, {}) == OK);
1392 query = Query::Select();
1393 SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PULL_ONLY, OK);
1394
1395 VirtualDataItem item;
1396 EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
1397 }
1398
RegOnDispatchToGetSyncCount(int & sendRequestCount,int sleepMs=0)1399 void RegOnDispatchToGetSyncCount(int &sendRequestCount, int sleepMs = 0)
1400 {
1401 g_communicatorAggregator->RegOnDispatch([sleepMs, &sendRequestCount](
1402 const std::string &dev, Message *inMsg) {
1403 if (dev == DEVICE_B && inMsg->GetMessageType() == TYPE_REQUEST) {
1404 std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1405 sendRequestCount++;
1406 LOGD("sendRequestCount++...");
1407 }
1408 });
1409 }
1410
TestDifferentSyncMode(SyncMode mode)1411 void TestDifferentSyncMode(SyncMode mode)
1412 {
1413 std::vector<std::string> devices;
1414 devices.push_back(g_deviceB->GetDeviceId());
1415
1416 /**
1417 * @tc.steps: step1. deviceA put {k1, v1}
1418 */
1419 Key key = {'1'};
1420 Value value = {'1'};
1421 DBStatus status = g_kvDelegatePtr->Put(key, value);
1422 ASSERT_TRUE(status == OK);
1423
1424 int sendRequestCount = 0;
1425 RegOnDispatchToGetSyncCount(sendRequestCount);
1426
1427 /**
1428 * @tc.steps: step2. deviceA call sync and wait
1429 * @tc.expected: step2. sync should return OK.
1430 */
1431 std::map<std::string, DBStatus> result;
1432 status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result);
1433 ASSERT_TRUE(status == OK);
1434
1435 /**
1436 * @tc.expected: step2. onComplete should be called, DeviceB have {k1,v1}, send request message 3 times
1437 */
1438 ASSERT_TRUE(result.size() == devices.size());
1439 for (const auto &pair : result) {
1440 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1441 EXPECT_TRUE(pair.second == OK);
1442 }
1443 VirtualDataItem item;
1444 g_deviceB->GetData(key, item);
1445 EXPECT_TRUE(item.value == value);
1446
1447 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1448
1449 /**
1450 * @tc.steps: step3. reset sendRequestCount to 0, deviceA call sync and wait again without any change in db
1451 * @tc.expected: step3. sync should return OK, and sendRequestCount should be 1, because this merge can not
1452 * be skipped
1453 */
1454 sendRequestCount = 0;
1455 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1456 ASSERT_TRUE(status == OK);
1457 EXPECT_EQ(sendRequestCount, 1);
1458 }
1459
1460 /**
1461 * @tc.name: PushSyncMergeCheck001
1462 * @tc.desc: Test push sync task merge, task can not be merged when the two sync task is not in the queue
1463 * at the same time.
1464 * @tc.type: FUNC
1465 * @tc.require:
1466 * @tc.author: zhangshijie
1467 */
1468 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck001, TestSize.Level1)
1469 {
1470 TestDifferentSyncMode(SYNC_MODE_PUSH_ONLY);
1471 }
1472
1473 /**
1474 * @tc.name: PushSyncMergeCheck002
1475 * @tc.desc: Test push_pull sync task merge, task can not be merged when the two sync task is not in the queue
1476 * at the same time.
1477 * @tc.type: FUNC
1478 * @tc.require:
1479 * @tc.author: zhangshijie
1480 */
1481 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck002, TestSize.Level1)
1482 {
1483 TestDifferentSyncMode(SYNC_MODE_PUSH_PULL);
1484 }
1485
PrepareForSyncMergeTest(std::vector<std::string> & devices,int & sendRequestCount)1486 void PrepareForSyncMergeTest(std::vector<std::string> &devices, int &sendRequestCount)
1487 {
1488 /**
1489 * @tc.steps: step1. deviceA put {k1, v1}
1490 */
1491 Key key = {'1'};
1492 Value value = {'1'};
1493 DBStatus status = g_kvDelegatePtr->Put(key, value);
1494 ASSERT_TRUE(status == OK);
1495
1496 RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1497
1498 /**
1499 * @tc.steps: step2. deviceA call sync and don't wait
1500 * @tc.expected: step2. sync should return OK.
1501 */
1502 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
1503 [&sendRequestCount, devices, key, value](const std::map<std::string, DBStatus>& statusMap) {
1504 ASSERT_TRUE(statusMap.size() == devices.size());
1505 for (const auto &pair : statusMap) {
1506 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1507 EXPECT_TRUE(pair.second == OK);
1508 }
1509 VirtualDataItem item;
1510 g_deviceB->GetData(key, item);
1511 EXPECT_EQ(item.value, value);
1512 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1513
1514 // reset sendRequestCount to 0
1515 sendRequestCount = 0;
1516 });
1517 ASSERT_TRUE(status == OK);
1518 }
1519
1520 /**
1521 * @tc.name: PushSyncMergeCheck003
1522 * @tc.desc: Test push sync task merge, task can not be merged when there is change in db since last push sync
1523 * @tc.type: FUNC
1524 * @tc.require:
1525 * @tc.author: zhangshijie
1526 */
1527 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck003, TestSize.Level3)
1528 {
1529 DBStatus status = OK;
1530 std::vector<std::string> devices;
1531 devices.push_back(g_deviceB->GetDeviceId());
1532
1533 int sendRequestCount = 0;
1534 PrepareForSyncMergeTest(devices, sendRequestCount);
1535
1536 /**
1537 * @tc.steps: step3. deviceA call sync and don't wait
1538 * @tc.expected: step3. sync should return OK.
1539 */
1540 Key key = {'1'};
1541 Value value = {'2'};
1542 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672002(const std::map<std::string, DBStatus>& statusMap) 1543 [&sendRequestCount, devices, key, value, this](const std::map<std::string, DBStatus>& statusMap) {
1544 /**
1545 * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1546 * skipped, but it is no need to do time sync and ability sync, only need to do data sync
1547 */
1548 ASSERT_TRUE(statusMap.size() == devices.size());
1549 for (const auto &pair : statusMap) {
1550 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1551 EXPECT_TRUE(pair.second == OK);
1552 }
1553 VirtualDataItem item;
1554 g_deviceB->GetData(key, item);
1555 EXPECT_EQ(item.value, value);
1556 });
1557 ASSERT_TRUE(status == OK);
1558
1559 /**
1560 * @tc.steps: step4. deviceA put {k1, v2}
1561 */
1562 while (sendRequestCount < TWO_CNT) {
1563 std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1564 }
1565 status = g_kvDelegatePtr->Put(key, value);
1566 ASSERT_TRUE(status == OK);
1567 // wait for the second sync task finish
1568 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1569 EXPECT_EQ(sendRequestCount, 1);
1570 }
1571
1572 /**
1573 * @tc.name: PushSyncMergeCheck004
1574 * @tc.desc: Test push sync task merge, task can be merged when there is no change in db since last push sync
1575 * @tc.type: FUNC
1576 * @tc.require:
1577 * @tc.author: zhangshijie
1578 */
1579 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck004, TestSize.Level3)
1580 {
1581 DBStatus status = OK;
1582 std::vector<std::string> devices;
1583 devices.push_back(g_deviceB->GetDeviceId());
1584
1585 int sendRequestCount = 0;
1586 PrepareForSyncMergeTest(devices, sendRequestCount);
1587
1588 /**
1589 * @tc.steps: step3. deviceA call sync and don't wait
1590 * @tc.expected: step3. sync should return OK.
1591 */
1592 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672102(const std::map<std::string, DBStatus>& statusMap) 1593 [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1594 /**
1595 * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can be
1596 * skipped
1597 */
1598 ASSERT_TRUE(statusMap.size() == devices.size());
1599 for (const auto &pair : statusMap) {
1600 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1601 EXPECT_TRUE(pair.second == OK);
1602 }
1603 });
1604 ASSERT_TRUE(status == OK);
1605 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1606 EXPECT_EQ(sendRequestCount, 0);
1607 }
1608
RegOnDispatchWithInvalidMsgAndCnt(int & sendRequestCount,int sleepMs,bool & invalid)1609 void RegOnDispatchWithInvalidMsgAndCnt(int &sendRequestCount, int sleepMs, bool &invalid)
1610 {
1611 g_communicatorAggregator->RegOnDispatch([&sendRequestCount, sleepMs, &invalid](
1612 const std::string &dev, Message *inMsg) {
1613 if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
1614 inMsg->SetMessageType(TYPE_INVALID);
1615 inMsg->SetMessageId(INVALID_MESSAGE_ID);
1616 sendRequestCount++;
1617 invalid = true;
1618 LOGW("[Dispatch]invalid THIS MSG, sendRequestCount = %d", sendRequestCount);
1619 std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1620 }
1621 });
1622 }
1623
1624 /**
1625 * @tc.name: PushSyncMergeCheck005
1626 * @tc.desc: Test push sync task merge, task cannot be merged when the last push sync is failed
1627 * @tc.type: FUNC
1628 * @tc.require:
1629 * @tc.author: zhangshijie
1630 */
1631 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck005, TestSize.Level3)
1632 {
1633 DBStatus status = OK;
1634 std::vector<std::string> devices;
1635 devices.push_back(g_deviceB->GetDeviceId());
1636
1637 /**
1638 * @tc.steps: step1. deviceA put {k1, v1}
1639 */
1640 Key key = {'1'};
1641 Value value = {'1'};
1642 status = g_kvDelegatePtr->Put(key, value);
1643 ASSERT_TRUE(status == OK);
1644
1645 int sendRequestCount = 0;
1646 bool invalid = false;
1647 RegOnDispatchWithInvalidMsgAndCnt(sendRequestCount, SLEEP_MILLISECONDS, invalid);
1648
1649 /**
1650 * @tc.steps: step2. deviceA call sync and don't wait
1651 * @tc.expected: step2. sync should return TIME_OUT.
1652 */
1653 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672302(const std::map<std::string, DBStatus>& statusMap) 1654 [&sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1655 ASSERT_TRUE(statusMap.size() == devices.size());
1656 for (const auto &deviceId : devices) {
1657 ASSERT_EQ(statusMap.at(deviceId), TIME_OUT);
1658 }
1659 });
1660 EXPECT_TRUE(status == OK);
1661
1662 /**
1663 * @tc.steps: step3. deviceA call sync and don't wait
1664 * @tc.expected: step3. sync should return OK.
1665 */
1666 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672402(const std::map<std::string, DBStatus>& statusMap) 1667 [key, value, &sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1668 /**
1669 * @tc.expected: when the second sync task return, sendRequestCount should be 3, because this merge can not be
1670 * skipped, deviceB should have {k1, v1}.
1671 */
1672 ASSERT_TRUE(statusMap.size() == devices.size());
1673 for (const auto &pair : statusMap) {
1674 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1675 EXPECT_EQ(pair.second, OK);
1676 }
1677 VirtualDataItem item;
1678 g_deviceB->GetData(key, item);
1679 EXPECT_EQ(item.value, value);
1680 });
1681 ASSERT_TRUE(status == OK);
1682 while (sendRequestCount < 1) {
1683 std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1684 }
1685 sendRequestCount = 0;
1686 RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1687
1688 // wait for the second sync task finish
1689 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1690 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1691 }
1692
PrePareForQuerySyncMergeTest(bool isQuerySync,std::vector<std::string> & devices,Key & key,Value & value,int & sendRequestCount)1693 void PrePareForQuerySyncMergeTest(bool isQuerySync, std::vector<std::string> &devices,
1694 Key &key, Value &value, int &sendRequestCount)
1695 {
1696 DBStatus status = OK;
1697 /**
1698 * @tc.steps: step1. deviceA put {k1, v1}...{k10, v10}
1699 */
1700 Query query = Query::Select().PrefixKey(key);
1701 const int dataSize = 10;
1702 for (int i = 0; i < dataSize; i++) {
1703 key.push_back(i);
1704 value.push_back(i);
1705 status = g_kvDelegatePtr->Put(key, value);
1706 ASSERT_TRUE(status == OK);
1707 key.pop_back();
1708 value.pop_back();
1709 }
1710
1711 RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1712 /**
1713 * @tc.steps: step2. deviceA call query sync and don't wait
1714 * @tc.expected: step2. sync should return OK.
1715 */
1716 auto completeCallBack = [&sendRequestCount, &key, &value, dataSize, devices]
1717 (const std::map<std::string, DBStatus>& statusMap) {
1718 ASSERT_TRUE(statusMap.size() == devices.size());
1719 for (const auto &pair : statusMap) {
1720 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1721 EXPECT_EQ(pair.second, OK);
1722 }
1723 // when first sync finish, DeviceB have {k1,v1}, {k3,v3}, {k5,v5} .. send request message 3 times
1724 VirtualDataItem item;
1725 for (int i = 0; i < dataSize; i++) {
1726 key.push_back(i);
1727 value.push_back(i);
1728 g_deviceB->GetData(key, item);
1729 EXPECT_EQ(item.value, value);
1730 key.pop_back();
1731 value.pop_back();
1732 }
1733 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1734 // reset sendRequestCount to 0
1735 sendRequestCount = 0;
1736 };
1737 if (isQuerySync) {
1738 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack, query, false);
1739 } else {
1740 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack);
1741 }
1742 ASSERT_TRUE(status == OK);
1743 }
1744
1745 /**
1746 * @tc.name: QuerySyncMergeCheck001
1747 * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last query sync
1748 * @tc.type: FUNC
1749 * @tc.require:
1750 * @tc.author: zhangshijie
1751 */
1752 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck001, TestSize.Level3)
1753 {
1754 std::vector<std::string> devices;
1755 int sendRequestCount = 0;
1756 devices.push_back(g_deviceB->GetDeviceId());
1757
1758 Key key {'1'};
1759 Value value {'1'};
1760 Query query = Query::Select().PrefixKey(key);
1761 PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1762
1763 /**
1764 * @tc.steps: step3. deviceA call query sync and don't wait
1765 * @tc.expected: step3. sync should return OK.
1766 */
1767 DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672602(const std::map<std::string, DBStatus>& statusMap) 1768 [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1769 /**
1770 * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can be
1771 * skipped because there is no change in db since last query sync
1772 */
1773 ASSERT_TRUE(statusMap.size() == devices.size());
1774 for (const auto &pair : statusMap) {
1775 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1776 EXPECT_TRUE(pair.second == OK);
1777 }
1778 }, query, false);
1779 ASSERT_TRUE(status == OK);
1780 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1781 EXPECT_EQ(sendRequestCount, 0);
1782 }
1783
1784 /**
1785 * @tc.name: QuerySyncMergeCheck002
1786 * @tc.desc: Test query push sync task merge, task can not be merged when there is change in db since last sync
1787 * @tc.type: FUNC
1788 * @tc.require:
1789 * @tc.author: zhangshijie
1790 */
1791 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck002, TestSize.Level3)
1792 {
1793 std::vector<std::string> devices;
1794 int sendRequestCount = 0;
1795 devices.push_back(g_deviceB->GetDeviceId());
1796
1797 Key key {'1'};
1798 Value value {'1'};
1799 Query query = Query::Select().PrefixKey(key);
1800 PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1801
1802 /**
1803 * @tc.steps: step3. deviceA call query sync and don't wait
1804 * @tc.expected: step3. sync should return OK.
1805 */
1806 Value value3{'3'};
1807 DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672702(const std::map<std::string, DBStatus>& statusMap) 1808 [&sendRequestCount, devices, key, value3, this](const std::map<std::string, DBStatus>& statusMap) {
1809 /**
1810 * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1811 * skipped when there is change in db since last query sync, deviceB have {k1, v1'}
1812 */
1813 ASSERT_TRUE(statusMap.size() == devices.size());
1814 for (const auto &pair : statusMap) {
1815 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1816 EXPECT_TRUE(pair.second == OK);
1817 }
1818 VirtualDataItem item;
1819 g_deviceB->GetData(key, item);
1820 EXPECT_TRUE(item.value == value3);
1821 EXPECT_EQ(sendRequestCount, 1);
1822 }, query, false);
1823 ASSERT_TRUE(status == OK);
1824
1825 /**
1826 * @tc.steps: step4. deviceA put {k1, v1'}
1827 * @tc.steps: step4. reset sendRequestCount to 0, deviceA call sync and wait
1828 * @tc.expected: step4. sync should return OK, and sendRequestCount should be 1, because this merge can not
1829 * be skipped
1830 */
1831 while (sendRequestCount < TWO_CNT) {
1832 std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1833 }
1834 g_kvDelegatePtr->Put(key, value3);
1835 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1836 }
1837
1838 /**
1839 * @tc.name: QuerySyncMergeCheck003
1840 * @tc.desc: Test query push sync task merge, task can not be merged when then query id is different
1841 * @tc.type: FUNC
1842 * @tc.require:
1843 * @tc.author: zhangshijie
1844 */
1845 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck003, TestSize.Level3)
1846 {
1847 std::vector<std::string> devices;
1848 int sendRequestCount = 0;
1849 devices.push_back(g_deviceB->GetDeviceId());
1850
1851 Key key {'1'};
1852 Value value {'1'};
1853 PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1854
1855 /**
1856 * @tc.steps: step3. deviceA call another query sync
1857 * @tc.expected: step3. sync should return OK.
1858 */
1859 Key key2 = {'2'};
1860 Value value2 = {'2'};
1861 DBStatus status = g_kvDelegatePtr->Put(key2, value2);
1862 ASSERT_TRUE(status == OK);
1863 Query query2 = Query::Select().PrefixKey(key2);
1864 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672802(const std::map<std::string, DBStatus>& statusMap) 1865 [&sendRequestCount, key2, value2, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1866 /**
1867 * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1868 * skipped, deviceB have {k2,v2}
1869 */
1870 ASSERT_TRUE(statusMap.size() == devices.size());
1871 for (const auto &pair : statusMap) {
1872 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1873 EXPECT_TRUE(pair.second == OK);
1874 }
1875 VirtualDataItem item;
1876 g_deviceB->GetData(key2, item);
1877 EXPECT_TRUE(item.value == value2);
1878 EXPECT_EQ(sendRequestCount, 1);
1879 }, query2, false);
1880 ASSERT_TRUE(status == OK);
1881 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1882 }
1883
1884 /**
1885 * @tc.name: QuerySyncMergeCheck004
1886 * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last push sync
1887 * @tc.type: FUNC
1888 * @tc.require:
1889 * @tc.author: zhangshijie
1890 */
1891 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck004, TestSize.Level3)
1892 {
1893 DBStatus status = OK;
1894 std::vector<std::string> devices;
1895 devices.push_back(g_deviceB->GetDeviceId());
1896
1897 Key key {'1'};
1898 Value value {'1'};
1899 int sendRequestCount = 0;
1900 PrePareForQuerySyncMergeTest(false, devices, key, value, sendRequestCount);
1901
1902 /**
1903 * @tc.steps: step3. deviceA call query sync without any change in db
1904 * @tc.expected: step3. sync should return OK, and sendRequestCount should be 0, because this merge can be skipped
1905 */
1906 Query query = Query::Select().PrefixKey(key);
1907 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anonde2bd9672902(const std::map<std::string, DBStatus>& statusMap) 1908 [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1909 /**
1910 * @tc.expected step3: when the second sync task return, sendRequestCount should be 0, because this merge
1911 * can be skipped because there is no change in db since last push sync
1912 */
1913 ASSERT_TRUE(statusMap.size() == devices.size());
1914 for (const auto &pair : statusMap) {
1915 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1916 EXPECT_TRUE(pair.second == OK);
1917 }
1918 }, query, false);
1919 ASSERT_TRUE(status == OK);
1920 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1921 EXPECT_EQ(sendRequestCount, 0);
1922 }
1923
1924 /**
1925 * @tc.name: GetDataNotify001
1926 * @tc.desc: Test GetDataNotify function, delay < 30s should sync ok, > 36 should timeout
1927 * @tc.type: FUNC
1928 * @tc.require:
1929 * @tc.author: zhangqiquan
1930 */
1931 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify001, TestSize.Level3)
1932 {
1933 ASSERT_NE(g_kvDelegatePtr, nullptr);
1934 DBStatus status = OK;
1935 std::vector<std::string> devices;
1936 devices.push_back(g_deviceB->GetDeviceId());
1937 const std::string DEVICE_A = "real_device";
1938 /**
1939 * @tc.steps: step1. deviceB set get data delay 40s
1940 */
1941 g_deviceB->DelayGetSyncData(WAIT_40_SECONDS);
1942 g_communicatorAggregator->SetTimeout(DEVICE_A, TIMEOUT_6_SECONDS);
1943
1944 /**
1945 * @tc.steps: step2. deviceA call sync and wait
1946 * @tc.expected: step2. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
1947 */
1948 std::map<std::string, DBStatus> result;
1949 std::map<std::string, int> virtualRes;
1950 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1951 EXPECT_EQ(status, OK);
1952 EXPECT_EQ(result.size(), devices.size());
1953 EXPECT_EQ(result[DEVICE_B], TIME_OUT);
1954 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1955 Query query = Query::Select();
__anonde2bd9672a02(std::map<std::string, int> resMap) 1956 g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1957 virtualRes = std::move(resMap);
1958 }, true);
1959 EXPECT_EQ(virtualRes.size(), devices.size());
1960 EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_TIMEOUT));
1961 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1962
1963 /**
1964 * @tc.steps: step3. deviceB set get data delay 30s
1965 */
1966 g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1967 /**
1968 * @tc.steps: step4. deviceA call sync and wait
1969 * @tc.expected: step4. sync should return OK. onComplete should be called, deviceB sync OK.
1970 */
1971 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1972 EXPECT_EQ(status, OK);
1973 EXPECT_EQ(result.size(), devices.size());
1974 EXPECT_EQ(result[DEVICE_B], OK);
1975 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
__anonde2bd9672b02(std::map<std::string, int> resMap) 1976 g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1977 virtualRes = std::move(resMap);
1978 }, true);
1979 EXPECT_EQ(virtualRes.size(), devices.size());
1980 EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_FINISHED_ALL));
1981 g_deviceB->DelayGetSyncData(0);
1982 }
1983
1984 /**
1985 * @tc.name: GetDataNotify002
1986 * @tc.desc: Test GetDataNotify function, two device sync each other at same time
1987 * @tc.type: FUNC
1988 * @tc.require:
1989 * @tc.author: zhangqiquan
1990 */
1991 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify002, TestSize.Level3)
1992 {
1993 ASSERT_NE(g_kvDelegatePtr, nullptr);
1994 DBStatus status = OK;
1995 std::vector<std::string> devices;
1996 devices.push_back(g_deviceB->GetDeviceId());
1997 const std::string DEVICE_A = "real_device";
1998
1999 /**
2000 * @tc.steps: step1. deviceA sync first to finish time sync and ability sync
2001 */
2002 std::map<std::string, DBStatus> result;
2003 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
2004 EXPECT_EQ(status, OK);
2005 EXPECT_EQ(result.size(), devices.size());
2006 EXPECT_EQ(result[DEVICE_B], OK);
2007 /**
2008 * @tc.steps: step2. deviceB set get data delay 30s
2009 */
2010 g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
2011
2012 /**
2013 * @tc.steps: step3. deviceB call sync and wait
2014 */
__anonde2bd9672c02() 2015 std::thread asyncThread([]() {
2016 std::map<std::string, int> virtualRes;
2017 Query query = Query::Select();
2018 g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
2019 virtualRes = std::move(resMap);
2020 }, true);
2021 });
2022
2023 /**
2024 * @tc.steps: step4. deviceA call sync and wait
2025 * @tc.expected: step4. sync should return OK. because notify timer trigger (30s - 1s)/2s => 15times
2026 */
2027 std::this_thread::sleep_for(std::chrono::seconds(1));
2028 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
2029 EXPECT_EQ(status, OK);
2030 EXPECT_EQ(result.size(), devices.size());
2031 EXPECT_EQ(result[DEVICE_B], OK);
2032 asyncThread.join();
2033 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
2034 }
2035
2036 /**
2037 * @tc.name: DelaySync001
2038 * @tc.desc: Test delay first packet will not effect data conflict
2039 * @tc.type: FUNC
2040 * @tc.require:
2041 * @tc.author: zqq
2042 */
2043 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, DelaySync001, TestSize.Level3)
2044 {
2045 // B put (k, b) after A put (k, a)
2046 Key key = {'k'};
2047 Value aValue = {'a'};
2048 g_kvDelegatePtr->Put(key, aValue);
2049 std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for data conflict
2050 Timestamp currentTime = TimeHelper::GetSysCurrentTime() + TimeHelper::BASE_OFFSET;
2051 Value bValue = {'b'};
2052 EXPECT_EQ(g_deviceB->PutData(key, bValue, currentTime, 0), E_OK);
2053
2054 // delay time sync message, delay time/2 should greater than put sleep time
2055 g_communicatorAggregator->SetTimeout(DEVICE_B, DBConstant::MAX_TIMEOUT);
2056 g_communicatorAggregator->SetTimeout("real_device", DBConstant::MAX_TIMEOUT);
__anonde2bd9672e02(const std::string &dstTarget, const Message *msg) 2057 g_communicatorAggregator->RegBeforeDispatch([](const std::string &dstTarget, const Message *msg) {
2058 if (dstTarget == DEVICE_B && msg->GetMessageId() == MessageId::TIME_SYNC_MESSAGE) {
2059 std::this_thread::sleep_for(std::chrono::seconds(3)); // sleep for 3s
2060 }
2061 });
2062
2063 // A call sync and (k, b) in A
2064 std::vector<std::string> devices;
2065 devices.push_back(g_deviceB->GetDeviceId());
2066 std::map<std::string, DBStatus> result;
2067 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
2068 EXPECT_EQ(status, OK);
2069 EXPECT_EQ(result.size(), devices.size());
2070 EXPECT_EQ(result[DEVICE_B], OK);
2071
2072 Value actualValue;
2073 g_kvDelegatePtr->Get(key, actualValue);
2074 EXPECT_EQ(actualValue, bValue);
2075 g_communicatorAggregator->RegBeforeDispatch(nullptr);
2076 }
2077
2078 /**
2079 * @tc.name: KVAbilitySyncOpt001
2080 * @tc.desc: check ability sync 2 packet
2081 * @tc.type: FUNC
2082 * @tc.require:
2083 * @tc.author: zhangqiquan
2084 */
2085 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVAbilitySyncOpt001, TestSize.Level0)
2086 {
2087 /**
2088 * @tc.steps: step1. record packet
2089 * @tc.expected: step1. sync should failed in source.
2090 */
2091 std::atomic<int> messageCount = 0;
__anonde2bd9672f02(const std::string &dev, Message *msg) 2092 g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &dev, Message *msg) {
2093 if (msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
2094 return;
2095 }
2096 messageCount++;
2097 EXPECT_GE(g_kvDelegatePtr->GetTaskCount(), 1);
2098 });
2099 /**
2100 * @tc.steps: step2. deviceA call sync and wait
2101 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
2102 */
2103 DBStatus status = OK;
2104 std::vector<std::string> devices;
2105 devices.push_back(g_deviceB->GetDeviceId());
2106 std::map<std::string, DBStatus> result;
2107 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2108 EXPECT_EQ(status, OK);
2109 EXPECT_EQ(messageCount, 2); // 2 ability sync
2110 for (const auto &pair : result) {
2111 EXPECT_EQ(pair.second, OK);
2112 }
2113 }
2114
2115 /**
2116 * @tc.name: KVAbilitySyncOpt002
2117 * @tc.desc: check get task count while conn is nullptr.
2118 * @tc.type: FUNC
2119 * @tc.require:
2120 * @tc.author: caihaoting
2121 */
2122 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVAbilitySyncOpt002, TestSize.Level0)
2123 {
2124 /**
2125 * @tc.steps: step1. record packet while conn is nullptr.
2126 * @tc.expected: step1. sync should failed in source and get task count return DB_ERROR.
2127 */
2128 auto kvStoreImpl = static_cast<KvStoreNbDelegateImpl *>(g_kvDelegatePtr);
2129 EXPECT_EQ(kvStoreImpl->Close(), OK);
2130 std::atomic<int> messageCount = 0;
__anonde2bd9673002(const std::string &dev, Message *msg) 2131 g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &dev, Message *msg) {
2132 if (msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
2133 return;
2134 }
2135 messageCount++;
2136 EXPECT_EQ(g_kvDelegatePtr->GetTaskCount(), DB_ERROR);
2137 });
2138 }
2139
2140 /**
2141 * @tc.name: KVSyncOpt001
2142 * @tc.desc: check time sync and ability sync once
2143 * @tc.type: FUNC
2144 * @tc.require:
2145 * @tc.author: zhangqiquan
2146 */
2147 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt001, TestSize.Level0)
2148 {
2149 /**
2150 * @tc.steps: step1. record packet which send to B
2151 */
2152 std::atomic<int> messageCount = 0;
2153 RegOnDispatchWithoutDataPacket(messageCount);
2154 /**
2155 * @tc.steps: step2. deviceA call sync and wait
2156 * @tc.expected: step2. sync should return OK.
2157 */
2158 std::vector<std::string> devices;
2159 devices.push_back(g_deviceB->GetDeviceId());
2160 Sync(devices, OK);
2161 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2162 /**
2163 * @tc.steps: step3. reopen kv store
2164 * @tc.expected: step3. reopen OK.
2165 */
2166 ReOpenDB();
2167 /**
2168 * @tc.steps: step4. reopen kv store and sync again
2169 * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2170 */
2171 messageCount = 0;
2172 Sync(devices, OK);
2173 EXPECT_EQ(messageCount, 0);
2174 g_communicatorAggregator->RegOnDispatch(nullptr);
2175 }
2176
2177 /**
2178 * @tc.name: KVSyncOpt002
2179 * @tc.desc: check device time sync once
2180 * @tc.type: FUNC
2181 * @tc.require:
2182 * @tc.author: zhangqiquan
2183 */
2184 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt002, TestSize.Level0)
2185 {
2186 /**
2187 * @tc.steps: step1. record packet which send to B
2188 */
2189 std::atomic<int> messageCount = 0;
2190 RegOnDispatchWithoutDataPacket(messageCount);
2191 /**
2192 * @tc.steps: step2. deviceA call sync and wait
2193 * @tc.expected: step2. sync should return OK.
2194 */
2195 std::vector<std::string> devices;
2196 devices.push_back(g_deviceB->GetDeviceId());
2197 Sync(devices, OK);
2198 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2199 // close kv store avoid packet dispatch error
2200 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
2201 g_kvDelegatePtr = nullptr;
2202 ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID), OK);
2203 EXPECT_TRUE(RuntimeContext::GetInstance()->IsTimeTickMonitorValid());
2204 /**
2205 * @tc.steps: step3. open new kv store
2206 * @tc.expected: step3. open OK.
2207 */
2208 KvStoreNbDelegate::Option option;
2209 option.secOption.securityLabel = SecurityLabel::S3;
2210 option.secOption.securityFlag = SecurityFlag::SECE;
2211 KvStoreNbDelegate *delegate2 = nullptr;
__anonde2bd9673102(DBStatus status, KvStoreNbDelegate *delegate) 2212 g_mgr.GetKvStore(STORE_ID_2, option, [&delegate2](DBStatus status, KvStoreNbDelegate *delegate) {
2213 delegate2 = delegate;
2214 EXPECT_EQ(status, OK);
2215 });
2216 /**
2217 * @tc.steps: step4. sync again
2218 * @tc.expected: step4. sync success, only ability sync packet.
2219 */
2220 messageCount = 0;
2221 Sync(delegate2, devices, OK);
2222 EXPECT_EQ(messageCount, 1); // 1 contain ability sync packet
2223 EXPECT_EQ(g_mgr.CloseKvStore(delegate2), OK);
2224 EXPECT_EQ(g_mgr.DeleteKvStore(STORE_ID_2), OK);
2225 g_communicatorAggregator->RegOnDispatch(nullptr);
2226 }
2227
2228 /**
2229 * @tc.name: KVSyncOpt003
2230 * @tc.desc: check time sync and ability sync once
2231 * @tc.type: FUNC
2232 * @tc.require:
2233 * @tc.author: zhangqiquan
2234 */
2235 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt003, TestSize.Level0)
2236 {
2237 /**
2238 * @tc.steps: step1. record packet which send to B
2239 */
2240 std::atomic<int> messageCount = 0;
2241 RegOnDispatchWithoutDataPacket(messageCount);
2242 /**
2243 * @tc.steps: step2. deviceA call sync and wait
2244 * @tc.expected: step2. sync should return OK.
2245 */
2246 std::vector<std::string> devices;
2247 devices.push_back(g_deviceB->GetDeviceId());
2248 Sync(devices, OK);
2249 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2250 /**
2251 * @tc.steps: step3. reopen kv store
2252 * @tc.expected: step3. reopen OK.
2253 */
2254 ReOpenDB();
2255 /**
2256 * @tc.steps: step4. reopen kv store and sync again
2257 * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2258 */
2259 messageCount = 0;
2260 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2261 EXPECT_EQ(messageCount, 0);
2262 g_communicatorAggregator->RegOnDispatch(nullptr);
2263 }
2264
2265 /**
2266 * @tc.name: KVSyncOpt004
2267 * @tc.desc: check sync in keys after reopen db
2268 * @tc.type: FUNC
2269 * @tc.require:
2270 * @tc.author: zhangqiquan
2271 */
2272 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt004, TestSize.Level0)
2273 {
2274 /**
2275 * @tc.steps: step1. deviceA call sync and wait
2276 * @tc.expected: step1. sync should return OK.
2277 */
2278 std::vector<std::string> devices;
2279 devices.push_back(g_deviceB->GetDeviceId());
2280 Sync(devices, OK);
2281 /**
2282 * @tc.steps: step2. reopen kv store
2283 * @tc.expected: step2. reopen OK.
2284 */
2285 ReOpenDB();
2286 /**
2287 * @tc.steps: step3. sync with in keys
2288 * @tc.expected: step3. sync OK.
2289 */
2290 std::map<std::string, DBStatus> result;
2291 std::set<Key> condition;
2292 condition.insert({'k'});
2293 Query query = Query::Select().InKeys(condition);
2294 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
2295 EXPECT_EQ(status, OK);
2296 for (const auto &deviceId : devices) {
2297 EXPECT_EQ(result[deviceId], OK);
2298 }
2299 }
2300
2301 /**
2302 * @tc.name: KVSyncOpt005
2303 * @tc.desc: check record ability finish after receive ability sync
2304 * @tc.type: FUNC
2305 * @tc.require:
2306 * @tc.author: zhangqiquan
2307 */
2308 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt005, TestSize.Level0)
2309 {
2310 /**
2311 * @tc.steps: step1. record packet which send to B
2312 */
2313 std::atomic<int> messageCount = 0;
2314 RegOnDispatchWithoutDataPacket(messageCount, true);
2315 /**
2316 * @tc.steps: step2. deviceB call sync and wait
2317 * @tc.expected: step2. sync should return OK.
2318 */
2319 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2320 EXPECT_EQ(messageCount, 2); // DEV_A send negotiation 2 ack packet.
2321 /**
2322 * @tc.steps: step3. reopen kv store
2323 * @tc.expected: step3. reopen OK.
2324 */
2325 ReOpenDB();
2326 /**
2327 * @tc.steps: step4. reopen kv store and sync again
2328 * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2329 */
2330 messageCount = 0;
2331 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2332 EXPECT_EQ(messageCount, 0);
2333 g_communicatorAggregator->RegOnDispatch(nullptr);
2334 }
2335
2336 /**
2337 * @tc.name: KVSyncOpt006
2338 * @tc.desc: check time sync and ability sync once after rebuild
2339 * @tc.type: FUNC
2340 * @tc.require:
2341 * @tc.author: zhangqiquan
2342 */
2343 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt006, TestSize.Level0)
2344 {
2345 /**
2346 * @tc.steps: step1. record packet which send to B
2347 */
2348 std::atomic<int> messageCount = 0;
2349 RegOnDispatchWithoutDataPacket(messageCount, true);
2350 /**
2351 * @tc.steps: step2. deviceA call sync and wait
2352 * @tc.expected: step2. sync should return OK.
2353 */
2354 std::vector<std::string> devices;
2355 devices.push_back(g_deviceB->GetDeviceId());
2356 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2357 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2358 /**
2359 * @tc.steps: step3. rebuild kv store
2360 * @tc.expected: step3. rebuild OK.
2361 */
2362 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
2363 g_kvDelegatePtr = nullptr;
2364 g_mgr.DeleteKvStore(STORE_ID);
2365 KvStoreNbDelegate::Option option;
2366 option.secOption.securityLabel = SecurityLabel::S3;
2367 option.secOption.securityFlag = SecurityFlag::SECE;
2368 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
2369 ASSERT_TRUE(g_kvDelegateStatus == OK);
2370 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2371 /**
2372 * @tc.steps: step4. rebuild kv store and sync again
2373 * @tc.expected: step4. rebuild OK and sync success, re ability sync.
2374 */
2375 messageCount = 0;
2376 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2377 EXPECT_EQ(messageCount, 1);
2378 g_communicatorAggregator->RegOnDispatch(nullptr);
2379 }
2380
2381 /**
2382 * @tc.name: KVSyncOpt007
2383 * @tc.desc: check re ability sync after import
2384 * @tc.type: FUNC
2385 * @tc.require:
2386 * @tc.author: zhangqiquan
2387 */
2388 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt007, TestSize.Level0)
2389 {
2390 /**
2391 * @tc.steps: step1. record packet which send to B
2392 */
2393 std::atomic<int> messageCount = 0;
2394 RegOnDispatchWithoutDataPacket(messageCount, true);
2395 /**
2396 * @tc.steps: step2. deviceB call sync and wait
2397 * @tc.expected: step2. sync should return OK.
2398 */
2399 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2400 EXPECT_EQ(messageCount, 2); // DEV_A send negotiation 2 ack packet.
2401 /**
2402 * @tc.steps: step3. export and import
2403 * @tc.expected: step3. export and import OK.
2404 */
2405 std::string singleExportFileName = g_testDir + "/KVSyncOpt007.$$";
2406 CipherPassword passwd;
2407 EXPECT_EQ(g_kvDelegatePtr->Export(singleExportFileName, passwd), OK);
2408 EXPECT_EQ(g_kvDelegatePtr->Import(singleExportFileName, passwd), OK);
2409 /**
2410 * @tc.steps: step4. reopen kv store and sync again
2411 * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2412 */
2413 messageCount = 0;
2414 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2415 EXPECT_EQ(messageCount, 1); // DEV_A send negotiation 1 ack packet.
2416 g_communicatorAggregator->RegOnDispatch(nullptr);
2417 }
2418
2419 /**
2420 * @tc.name: KVSyncOpt008
2421 * @tc.desc: check rebuild open store with NOT_SET.
2422 * @tc.type: FUNC
2423 * @tc.require:
2424 * @tc.author: tankaisheng
2425 */
2426 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt008, TestSize.Level0)
2427 {
2428 /**
2429 * @tc.steps: step1. record packet which send to B
2430 */
2431 std::atomic<int> messageCount = 0;
2432 RegOnDispatchWithoutDataPacket(messageCount, true);
2433 /**
2434 * @tc.steps: step2. deviceA call sync and wait
2435 * @tc.expected: step2. sync should return OK.
2436 */
2437 std::vector<std::string> devices;
2438 devices.push_back(g_deviceB->GetDeviceId());
2439 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2440 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2441 /**
2442 * @tc.steps: step3. rebuild kv store
2443 * @tc.expected: step3. rebuild failed.
2444 */
2445 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
2446 g_kvDelegatePtr = nullptr;
2447 g_mgr.DeleteKvStore(STORE_ID);
2448 KvStoreNbDelegate::Option option;
2449 option.secOption.securityLabel = SecurityLabel::NOT_SET;
2450 option.secOption.securityFlag = SecurityFlag::SECE;
2451 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
2452 ASSERT_TRUE(g_kvDelegateStatus == DBStatus::INVALID_ARGS);
2453 ASSERT_TRUE(g_kvDelegatePtr == nullptr);
2454 }
2455
2456 /**
2457 * @tc.name: KVTimeChange001
2458 * @tc.desc: check time sync and ability sync once
2459 * @tc.type: FUNC
2460 * @tc.require:
2461 * @tc.author: zhangqiquan
2462 */
2463 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVTimeChange001, TestSize.Level0)
2464 {
2465 /**
2466 * @tc.steps: step1. record packet which send to B
2467 */
2468 std::atomic<int> messageCount = 0;
2469 RegOnDispatchWithoutDataPacket(messageCount);
2470 /**
2471 * @tc.steps: step2. deviceA call sync and wait
2472 * @tc.expected: step2. sync should return OK.
2473 */
2474 std::vector<std::string> devices;
2475 devices.push_back(g_deviceB->GetDeviceId());
2476 Sync(devices, OK);
2477 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2478 /**
2479 * @tc.steps: step3. sync again
2480 * @tc.expected: step3. sync success, no negotiation packet.
2481 */
2482 messageCount = 0;
2483 Sync(devices, OK);
2484 EXPECT_EQ(messageCount, 0);
2485 /**
2486 * @tc.steps: step4. modify time offset and sync again
2487 * @tc.expected: step4. sync success, only time sync packet.
2488 */
2489 RuntimeContext::GetInstance()->NotifyTimestampChanged(100);
2490 RuntimeContext::GetInstance()->RecordAllTimeChange();
2491 RuntimeContext::GetInstance()->ClearAllDeviceTimeInfo();
2492 messageCount = 0;
2493 Sync(devices, OK);
2494 EXPECT_EQ(messageCount, 1); // 1 contain time sync request packet
2495 messageCount = 0;
2496 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2497 EXPECT_EQ(messageCount, 0);
2498 g_communicatorAggregator->RegOnDispatch(nullptr);
2499 }
2500
2501 /**
2502 * @tc.name: KVTimeChange002
2503 * @tc.desc: test NotifyTimestampChanged will not stuck when notify delegate with no metadata
2504 * @tc.type: FUNC
2505 * @tc.require:
2506 * @tc.author: liuhongyang
2507 */
2508 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVTimeChange002, TestSize.Level0)
2509 {
2510 /**
2511 * @tc.steps: step1. open a new store with STORE_ID_3
2512 * @tc.expected: step1. open success
2513 */
2514 KvStoreNbDelegate::Option option;
2515 option.secOption.securityLabel = SecurityLabel::S3;
2516 option.secOption.securityFlag = SecurityFlag::SECE;
2517 KvStoreNbDelegate *delegate2 = nullptr;
__anonde2bd9673202(DBStatus status, KvStoreNbDelegate *delegate) 2518 g_mgr.GetKvStore(STORE_ID_3, option, [&delegate2](DBStatus status, KvStoreNbDelegate *delegate) {
2519 delegate2 = delegate;
2520 EXPECT_EQ(status, OK);
2521 });
2522 ASSERT_TRUE(delegate2 != nullptr);
2523 /**
2524 * @tc.steps: step2. STORE_ID_3 sync once so that it will be notified when time change
2525 * @tc.expected: step2. sync should return OK.
2526 */
2527 std::vector<std::string> devices;
2528 devices.push_back(g_deviceB->GetDeviceId());
2529 std::map<std::string, DBStatus> result;
2530 EXPECT_EQ(g_tool.SyncTest(delegate2, devices, SYNC_MODE_PULL_ONLY, result, true), OK);
2531 /**
2532 * @tc.steps: step3. deviceA call sync and wait
2533 * @tc.expected: step3. sync should return OK.
2534 */
2535 Sync(devices, OK);
2536 /**
2537 * @tc.steps: step4. call NotifyTimestampChanged
2538 * @tc.expected: step4. expect no deadlock
2539 */
2540 RuntimeContext::GetInstance()->NotifyTimestampChanged(100);
2541 /**
2542 * @tc.steps: step5. clean up the created db
2543 */
2544 ASSERT_EQ(g_mgr.CloseKvStore(delegate2), OK);
2545 delegate2 = nullptr;
2546 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID_3) == OK);
2547 }
2548 }