1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17
18 #include "ability_sync.h"
19 #include "distributeddb_data_generate_unit_test.h"
20 #include "distributeddb_tools_unit_test.h"
21 #include "kv_store_nb_delegate_impl.h"
22 #include "kv_virtual_device.h"
23 #include "platform_specific.h"
24 #include "process_system_api_adapter_impl.h"
25 #include "single_ver_data_packet.h"
26 #include "virtual_communicator_aggregator.h"
27
28 using namespace testing::ext;
29 using namespace DistributedDB;
30 using namespace DistributedDBUnitTest;
31 using namespace std;
32
33 namespace {
34 string g_testDir;
35 const string STORE_ID = "kv_stroe_sync_check_test";
36 const std::string DEVICE_B = "deviceB";
37 const std::string DEVICE_C = "deviceC";
38 const int LOCAL_WATER_MARK_NOT_INIT = 0xaa;
39 const int EIGHT_HUNDRED = 800;
40 const int NORMAL_SYNC_SEND_REQUEST_CNT = 3;
41 const int TWO_CNT = 2;
42 const int SLEEP_MILLISECONDS = 500;
43 const int TEN_SECONDS = 10;
44 const int THREE_HUNDRED = 300;
45 const int WAIT_30_SECONDS = 30000;
46 const int WAIT_40_SECONDS = 40000;
47 const int TIMEOUT_6_SECONDS = 6000;
48
49 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
50 KvStoreConfig g_config;
51 DistributedDBToolsUnitTest g_tool;
52 DBStatus g_kvDelegateStatus = INVALID_ARGS;
53 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
54 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
55 KvVirtualDevice* g_deviceB = nullptr;
56 KvVirtualDevice* g_deviceC = nullptr;
57 VirtualSingleVerSyncDBInterface *g_syncInterfaceB = nullptr;
58 VirtualSingleVerSyncDBInterface *g_syncInterfaceC = nullptr;
59
60 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
61 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
62 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
63 #ifndef LOW_LEVEL_MEM_DEV
64 const int KEY_LEN = 20; // 20 Bytes
65 const int VALUE_LEN = 4 * 1024 * 1024; // 4MB
66 const int ENTRY_NUM = 2; // 16 entries
67 #endif
68
69 class DistributedDBSingleVerP2PSyncCheckTest : public testing::Test {
70 public:
71 static void SetUpTestCase(void);
72 static void TearDownTestCase(void);
73 void SetUp();
74 void TearDown();
75 };
76
SetUpTestCase(void)77 void DistributedDBSingleVerP2PSyncCheckTest::SetUpTestCase(void)
78 {
79 /**
80 * @tc.setup: Init datadir and Virtual Communicator.
81 */
82 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
83 g_config.dataDir = g_testDir;
84 g_mgr.SetKvStoreConfig(g_config);
85
86 string dir = g_testDir + "/single_ver";
87 DIR* dirTmp = opendir(dir.c_str());
88 if (dirTmp == nullptr) {
89 OS::MakeDBDirectory(dir);
90 } else {
91 closedir(dirTmp);
92 }
93
94 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
95 ASSERT_TRUE(g_communicatorAggregator != nullptr);
96 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
97
98 std::shared_ptr<ProcessSystemApiAdapterImpl> g_adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
99 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(g_adapter);
100 }
101
TearDownTestCase(void)102 void DistributedDBSingleVerP2PSyncCheckTest::TearDownTestCase(void)
103 {
104 /**
105 * @tc.teardown: Release virtual Communicator and clear data dir.
106 */
107 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
108 LOGE("rm test db files error!");
109 }
110 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
111 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
112 }
113
SetUp(void)114 void DistributedDBSingleVerP2PSyncCheckTest::SetUp(void)
115 {
116 DistributedDBToolsUnitTest::PrintTestCaseInfo();
117 /**
118 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
119 */
120 KvStoreNbDelegate::Option option;
121 option.secOption.securityLabel = SecurityLabel::S3;
122 option.secOption.securityFlag = SecurityFlag::SECE;
123 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
124 ASSERT_TRUE(g_kvDelegateStatus == OK);
125 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
126 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
127 ASSERT_TRUE(g_deviceB != nullptr);
128 g_syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
129 ASSERT_TRUE(g_syncInterfaceB != nullptr);
130 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, g_syncInterfaceB), E_OK);
131 SecurityOption virtualOption;
132 virtualOption.securityLabel = option.secOption.securityLabel;
133 virtualOption.securityFlag = option.secOption.securityFlag;
134 g_syncInterfaceB->SetSecurityOption(virtualOption);
135
136 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
137 ASSERT_TRUE(g_deviceC != nullptr);
138 g_syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
139 ASSERT_TRUE(g_syncInterfaceC != nullptr);
140 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, g_syncInterfaceC), E_OK);
141 g_syncInterfaceC->SetSecurityOption(virtualOption);
142 RuntimeContext::GetInstance()->ClearAllDeviceTimeInfo();
143 }
144
TearDown(void)145 void DistributedDBSingleVerP2PSyncCheckTest::TearDown(void)
146 {
147 /**
148 * @tc.teardown: Release device A, B, C
149 */
150 if (g_kvDelegatePtr != nullptr) {
151 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
152 g_kvDelegatePtr = nullptr;
153 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
154 LOGD("delete kv store status %d", status);
155 ASSERT_TRUE(status == OK);
156 }
157 if (g_deviceB != nullptr) {
158 delete g_deviceB;
159 g_deviceB = nullptr;
160 }
161 if (g_deviceC != nullptr) {
162 delete g_deviceC;
163 g_deviceC = nullptr;
164 }
165 if (g_communicatorAggregator != nullptr) {
166 g_communicatorAggregator->RegOnDispatch(nullptr);
167 }
168 }
169
170 /**
171 * @tc.name: sec option check Sync 001
172 * @tc.desc: if sec option not equal, forbid sync
173 * @tc.type: FUNC
174 * @tc.require: AR000EV1G6
175 * @tc.author: wangchuanqing
176 */
177 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck001, TestSize.Level1)
178 {
179 DBStatus status = OK;
180 std::vector<std::string> devices;
181 devices.push_back(g_deviceB->GetDeviceId());
182 devices.push_back(g_deviceC->GetDeviceId());
183
184 /**
185 * @tc.steps: step1. deviceA put {k1, v1}
186 */
187 Key key = {'1'};
188 Value value = {'1'};
189 status = g_kvDelegatePtr->Put(key, value);
190 ASSERT_TRUE(status == OK);
191
192 ASSERT_TRUE(g_syncInterfaceB != nullptr);
193 ASSERT_TRUE(g_syncInterfaceC != nullptr);
194 SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
195 g_syncInterfaceB->SetSecurityOption(secOption);
196 g_syncInterfaceC->SetSecurityOption(secOption);
197
198 /**
199 * @tc.steps: step2. deviceA call sync and wait
200 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
201 */
202 std::map<std::string, DBStatus> result;
203 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
204 ASSERT_TRUE(status == OK);
205
206 ASSERT_TRUE(result.size() == devices.size());
207 for (const auto &pair : result) {
208 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
209 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
210 }
211 VirtualDataItem item;
212 g_deviceB->GetData(key, item);
213 EXPECT_TRUE(item.value.empty());
214 g_deviceC->GetData(key, item);
215 EXPECT_TRUE(item.value.empty());
216 }
217
218 /**
219 * @tc.name: sec option check Sync 002
220 * @tc.desc: if sec option not equal, forbid sync
221 * @tc.type: FUNC
222 * @tc.require: AR000EV1G6
223 * @tc.author: wangchuanqing
224 */
225 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck002, TestSize.Level1)
226 {
227 DBStatus status = OK;
228 std::vector<std::string> devices;
229 devices.push_back(g_deviceB->GetDeviceId());
230 devices.push_back(g_deviceC->GetDeviceId());
231
232 /**
233 * @tc.steps: step1. deviceA put {k1, v1}
234 */
235 Key key = {'1'};
236 Value value = {'1'};
237 status = g_kvDelegatePtr->Put(key, value);
238 ASSERT_TRUE(status == OK);
239
240 ASSERT_TRUE(g_syncInterfaceC != nullptr);
241 SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
242 g_syncInterfaceC->SetSecurityOption(secOption);
243 secOption.securityLabel = SecurityLabel::S3;
244 secOption.securityFlag = SecurityFlag::SECE;
245 g_syncInterfaceB->SetSecurityOption(secOption);
246
247 /**
248 * @tc.steps: step2. deviceA call sync and wait
249 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
250 */
251 std::map<std::string, DBStatus> result;
252 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
253 ASSERT_TRUE(status == OK);
254
255 ASSERT_TRUE(result.size() == devices.size());
256 for (const auto &pair : result) {
257 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
258 if (pair.first == DEVICE_B) {
259 EXPECT_TRUE(pair.second == OK);
260 } else {
261 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
262 }
263 }
264 VirtualDataItem item;
265 g_deviceC->GetData(key, item);
266 EXPECT_TRUE(item.value.empty());
267 g_deviceB->GetData(key, item);
268 EXPECT_TRUE(item.value == value);
269 }
270
271 /**
272 * @tc.name: sec option check Sync 003
273 * @tc.desc: if sec option equal, check not pass, forbid sync
274 * @tc.type: FUNC
275 * @tc.require: AR000EV1G6
276 * @tc.author: zhangqiquan
277 */
278 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck003, TestSize.Level1)
279 {
280 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
281 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon728505270202(const std::string &, const SecurityOption &) 282 adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
283 return false;
284 });
285 /**
286 * @tc.steps: step1. record packet
287 * @tc.expected: step1. sync should failed in source.
288 */
289 std::atomic<int> messageCount = 0;
__anon728505270302(const std::string &, Message *) 290 g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &, Message *) {
291 messageCount++;
292 });
293 /**
294 * @tc.steps: step2. deviceA call sync and wait
295 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
296 */
297 DBStatus status = OK;
298 std::vector<std::string> devices;
299 devices.push_back(g_deviceB->GetDeviceId());
300 std::map<std::string, DBStatus> result;
301 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
302 EXPECT_EQ(status, OK);
303 EXPECT_EQ(messageCount, 4); // 4 = 2 time sync + 2 ability sync
304 for (const auto &pair : result) {
305 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
306 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
307 }
308 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
309 g_communicatorAggregator->RegOnDispatch(nullptr);
310 }
311
312 /**
313 * @tc.name: sec option check Sync 004
314 * @tc.desc: memory db not check device security
315 * @tc.type: FUNC
316 * @tc.require:
317 * @tc.author: zhangqiquan
318 */
319 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck004, TestSize.Level1)
320 {
321 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
322 g_kvDelegatePtr = nullptr;
323 KvStoreNbDelegate::Option option;
324 option.secOption.securityLabel = SecurityLabel::NOT_SET;
325 option.isMemoryDb = true;
326 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
327 ASSERT_TRUE(g_kvDelegateStatus == OK);
328 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
329
330 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
331 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon728505270402(const std::string &, const SecurityOption &) 332 adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
333 return false;
334 });
__anon728505270502(const std::string &, SecurityOption &securityOption) 335 adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
336 securityOption.securityLabel = NOT_SET;
337 return OK;
338 });
__anon728505270602(SecurityOption &) 339 g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &) {
340 return -E_NOT_SUPPORT;
341 });
342
343 std::vector<std::string> devices;
344 devices.push_back(g_deviceB->GetDeviceId());
345 std::map<std::string, DBStatus> result;
346 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
347 EXPECT_EQ(status, OK);
348 for (const auto &pair : result) {
349 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
350 EXPECT_TRUE(pair.second == OK);
351 }
352
353 adapter->ForkCheckDeviceSecurityAbility(nullptr);
354 adapter->ForkGetSecurityOption(nullptr);
355 g_syncInterfaceB->ForkGetSecurityOption(nullptr);
356 }
357
358 /**
359 * @tc.name: sec option check Sync 005
360 * @tc.desc: if sec option equal, check not pass, forbid sync
361 * @tc.type: FUNC
362 * @tc.require:
363 * @tc.author: zhangqiquan
364 */
365 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck005, TestSize.Level1)
366 {
367 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
368 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon728505270702(SecurityOption &option) 369 g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
370 option.securityLabel = NOT_SET;
371 return E_OK;
372 });
__anon728505270802(const std::string &, SecurityOption &securityOption) 373 adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
374 securityOption.securityLabel = NOT_SET;
375 return OK;
376 });
377
378 std::vector<std::string> devices;
379 devices.push_back(g_deviceB->GetDeviceId());
380 std::map<std::string, DBStatus> result;
381 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
382 EXPECT_EQ(status, OK);
383 for (const auto &pair : result) {
384 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
385 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
386 }
387
388 adapter->ForkCheckDeviceSecurityAbility(nullptr);
389 adapter->ForkGetSecurityOption(nullptr);
390 g_syncInterfaceB->ForkGetSecurityOption(nullptr);
391 }
392
393 /**
394 * @tc.name: sec option check Sync 006
395 * @tc.desc: memory db not check device security
396 * @tc.type: FUNC
397 * @tc.require:
398 * @tc.author: zhangqiquan
399 */
400 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck006, TestSize.Level0)
401 {
402 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
403 ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID), OK);
404 g_kvDelegatePtr = nullptr;
405 KvStoreNbDelegate::Option option;
406 option.secOption.securityLabel = SecurityLabel::S1;
407 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
408 ASSERT_TRUE(g_kvDelegateStatus == OK);
409 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
410
411 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
412 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon728505270902(const std::string &, const SecurityOption &) 413 adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
414 return true;
415 });
__anon728505270a02(const std::string &, SecurityOption &securityOption) 416 adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
417 securityOption.securityLabel = S1;
418 return OK;
419 });
__anon728505270b02(SecurityOption &option) 420 g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
421 option.securityLabel = SecurityLabel::S0;
422 return E_OK;
423 });
424
425 std::vector<std::string> devices;
426 devices.push_back(g_deviceB->GetDeviceId());
427 std::map<std::string, DBStatus> result;
428 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
429 EXPECT_EQ(status, OK);
430 for (const auto &pair : result) {
431 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
432 EXPECT_TRUE(pair.second == OK);
433 }
434
435 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(std::make_shared<ProcessSystemApiAdapterImpl>());
436 g_syncInterfaceB->ForkGetSecurityOption(nullptr);
437 }
438
439 /**
440 * @tc.name: sec option check Sync 007
441 * @tc.desc: sync should send security option
442 * @tc.type: FUNC
443 * @tc.require:
444 * @tc.author: zhangqiquan
445 */
446 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck007, TestSize.Level0)
447 {
448 /**
449 * @tc.steps: step1. fork check device security ability
450 * @tc.expected: step1. check param option should be S3 SECE.
451 */
452 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
453 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon728505270c02(const std::string &, const SecurityOption &option) 454 adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &option) {
455 EXPECT_EQ(option.securityLabel, SecurityLabel::S3);
456 EXPECT_EQ(option.securityFlag, SecurityFlag::SECE);
457 return true;
458 });
459 /**
460 * @tc.steps: step2. sync twice
461 * @tc.expected: step2. sync success.
462 */
463 std::vector<std::string> devices;
464 devices.push_back(g_deviceB->GetDeviceId());
465 std::map<std::string, DBStatus> result;
466 g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
467 auto status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
468 ASSERT_TRUE(status == OK);
469 ASSERT_TRUE(result.size() == devices.size());
470 for (const auto &pair : result) {
471 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
472 EXPECT_TRUE(pair.second == OK);
473 }
474 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
475 }
476
477 /**
478 * @tc.name: SecOptionCheck008
479 * @tc.desc: pull compress sync when check device ability fail
480 * @tc.type: FUNC
481 * @tc.require:
482 * @tc.author: zhangqiquan
483 */
484 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck008, TestSize.Level0)
485 {
486 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
487 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
488 auto deviceB = g_deviceB->GetDeviceId();
__anon728505270d02(const std::string &dev, const SecurityOption &) 489 adapter->ForkCheckDeviceSecurityAbility([deviceB](const std::string &dev, const SecurityOption &) {
490 if (dev != "real_device") {
491 return true;
492 }
493 return false;
494 });
__anon728505270e02(SecurityOption &option) 495 g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
496 option.securityLabel = SecurityLabel::S3;
497 option.securityFlag = SecurityFlag::SECE;
498 return E_OK;
499 });
500 g_syncInterfaceB->SetCompressSync(true);
501 std::vector<std::string> devices;
502 devices.push_back(deviceB);
503 std::map<std::string, DBStatus> result;
504 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
505 EXPECT_EQ(status, OK);
506 for (const auto &pair : result) {
507 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
508 EXPECT_EQ(pair.second, SECURITY_OPTION_CHECK_ERROR);
509 }
510
511 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(std::make_shared<ProcessSystemApiAdapterImpl>());
512 g_syncInterfaceB->ForkGetSecurityOption(nullptr);
513 g_syncInterfaceB->SetCompressSync(false);
514 }
515
516 #ifndef LOW_LEVEL_MEM_DEV
517 /**
518 * @tc.name: BigDataSync001
519 * @tc.desc: big data sync push mode.
520 * @tc.type: FUNC
521 * @tc.require: AR000F3OOU
522 * @tc.author: wangchuanqing
523 */
524 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync001, TestSize.Level1)
525 {
526 DBStatus status = OK;
527 std::vector<std::string> devices;
528 devices.push_back(g_deviceB->GetDeviceId());
529 devices.push_back(g_deviceC->GetDeviceId());
530
531 /**
532 * @tc.steps: step1. deviceA put 16 bigData
533 */
534 std::vector<Entry> entries;
535 std::vector<Key> keys;
536 DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
537 for (const auto &entry : entries) {
538 status = g_kvDelegatePtr->Put(entry.key, entry.value);
539 ASSERT_TRUE(status == OK);
540 }
541
542 /**
543 * @tc.steps: step2. deviceA call sync and wait
544 * @tc.expected: step2. sync should return OK.
545 */
546 std::map<std::string, DBStatus> result;
547 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
548 ASSERT_TRUE(status == OK);
549
550 /**
551 * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
552 */
553 ASSERT_TRUE(result.size() == devices.size());
554 for (const auto &pair : result) {
555 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
556 EXPECT_TRUE(pair.second == OK);
557 }
558 VirtualDataItem item;
559 for (const auto &entry : entries) {
560 item.value.clear();
561 g_deviceB->GetData(entry.key, item);
562 EXPECT_TRUE(item.value == entry.value);
563 item.value.clear();
564 g_deviceC->GetData(entry.key, item);
565 EXPECT_TRUE(item.value == entry.value);
566 }
567 }
568
569 /**
570 * @tc.name: BigDataSync002
571 * @tc.desc: big data sync pull mode.
572 * @tc.type: FUNC
573 * @tc.require: AR000F3OOU
574 * @tc.author: wangchuanqing
575 */
576 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync002, TestSize.Level1)
577 {
578 DBStatus status = OK;
579 std::vector<std::string> devices;
580 devices.push_back(g_deviceB->GetDeviceId());
581 devices.push_back(g_deviceC->GetDeviceId());
582
583 /**
584 * @tc.steps: step1. deviceA deviceB put bigData
585 */
586 std::vector<Entry> entries;
587 std::vector<Key> keys;
588 DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
589
590 for (uint32_t i = 0; i < entries.size(); i++) {
591 if (i % 2 == 0) {
592 g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
593 } else {
594 g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
595 }
596 }
597
598 /**
599 * @tc.steps: step3. deviceA call pull sync
600 * @tc.expected: step3. sync should return OK.
601 */
602 std::map<std::string, DBStatus> result;
603 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
604 ASSERT_TRUE(status == OK);
605
606 /**
607 * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
608 */
609 ASSERT_TRUE(result.size() == devices.size());
610 for (const auto &pair : result) {
611 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
612 EXPECT_TRUE(pair.second == OK);
613 }
614 for (const auto &entry : entries) {
615 Value value;
616 EXPECT_EQ(g_kvDelegatePtr->Get(entry.key, value), OK);
617 EXPECT_EQ(value, entry.value);
618 }
619 }
620
621 /**
622 * @tc.name: BigDataSync003
623 * @tc.desc: big data sync pushAndPull mode.
624 * @tc.type: FUNC
625 * @tc.require: AR000F3OOV
626 * @tc.author: wangchuanqing
627 */
628 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync003, TestSize.Level1)
629 {
630 DBStatus status = OK;
631 std::vector<std::string> devices;
632 devices.push_back(g_deviceB->GetDeviceId());
633 devices.push_back(g_deviceC->GetDeviceId());
634
635 /**
636 * @tc.steps: step1. deviceA deviceB put bigData
637 */
638 std::vector<Entry> entries;
639 std::vector<Key> keys;
640 DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
641
642 for (uint32_t i = 0; i < entries.size(); i++) {
643 if (i % 3 == 0) { // 0 3 6 9 12 15 for deivec B
644 g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
645 } else if (i % 3 == 1) { // 1 4 7 10 13 16 for device C
646 g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
647 } else { // 2 5 8 11 14 for device A
648 status = g_kvDelegatePtr->Put(entries[i].key, entries[i].value);
649 ASSERT_TRUE(status == OK);
650 }
651 }
652
653 /**
654 * @tc.steps: step3. deviceA call pushAndpull sync
655 * @tc.expected: step3. sync should return OK.
656 */
657 std::map<std::string, DBStatus> result;
658 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
659 ASSERT_TRUE(status == OK);
660
661 /**
662 * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
663 * deviceB and deviceC has deviceA data
664 */
665 ASSERT_TRUE(result.size() == devices.size());
666 for (const auto &pair : result) {
667 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
668 EXPECT_TRUE(pair.second == OK);
669 }
670
671 VirtualDataItem item;
672 for (uint32_t i = 0; i < entries.size(); i++) {
673 Value value;
674 EXPECT_EQ(g_kvDelegatePtr->Get(entries[i].key, value), OK);
675 EXPECT_EQ(value, entries[i].value);
676
677 if (i % 3 == 2) { // 2 5 8 11 14 for device A
678 item.value.clear();
679 g_deviceB->GetData(entries[i].key, item);
680 EXPECT_TRUE(item.value == entries[i].value);
681 item.value.clear();
682 g_deviceC->GetData(entries[i].key, item);
683 EXPECT_TRUE(item.value == entries[i].value);
684 }
685 }
686 }
687 #endif
688
689 /**
690 * @tc.name: PushFinishedNotify 001
691 * @tc.desc: Test remote device push finished notify function.
692 * @tc.type: FUNC
693 * @tc.require: AR000CQS3S
694 * @tc.author: xushaohua
695 */
696 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, PushFinishedNotify001, TestSize.Level1)
697 {
698 std::vector<std::string> devices;
699 devices.push_back(g_deviceB->GetDeviceId());
700
701 /**
702 * @tc.steps: step1. deviceA call SetRemotePushFinishedNotify
703 * @tc.expected: step1. set should return OK.
704 */
705 int pushfinishedFlag = 0;
706 DBStatus status = g_kvDelegatePtr->SetRemotePushFinishedNotify(
__anon728505270f02(const RemotePushNotifyInfo &info) 707 [&pushfinishedFlag](const RemotePushNotifyInfo &info) {
708 EXPECT_TRUE(info.deviceId == DEVICE_B);
709 pushfinishedFlag = 1;
710 });
711 ASSERT_EQ(status, OK);
712
713 /**
714 * @tc.steps: step2. deviceB put k2, v2, and deviceA pull from deviceB
715 * @tc.expected: step2. deviceA can not receive push finished notify
716 */
717 EXPECT_EQ(g_kvDelegatePtr->Put(KEY_2, VALUE_2), OK);
718 std::map<std::string, DBStatus> result;
719 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
720 EXPECT_TRUE(status == OK);
721 EXPECT_EQ(pushfinishedFlag, 0);
722 pushfinishedFlag = 0;
723
724 /**
725 * @tc.steps: step3. deviceB put k3, v3, and deviceA push and pull to deviceB
726 * @tc.expected: step3. deviceA can not receive push finished notify
727 */
728 EXPECT_EQ(g_kvDelegatePtr->Put(KEY_3, VALUE_3), OK);
729 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
730 EXPECT_TRUE(status == OK);
731 EXPECT_EQ(pushfinishedFlag, 0);
732 pushfinishedFlag = 0;
733
734 /**
735 * @tc.steps: step4. deviceA call SetRemotePushFinishedNotify to reset notify
736 * @tc.expected: step4. set should return OK.
737 */
__anon728505271002(const RemotePushNotifyInfo &info) 738 status = g_kvDelegatePtr->SetRemotePushFinishedNotify([&pushfinishedFlag](const RemotePushNotifyInfo &info) {
739 EXPECT_TRUE(info.deviceId == DEVICE_B);
740 pushfinishedFlag = 2;
741 });
742 ASSERT_EQ(status, OK);
743
744 /**
745 * @tc.steps: step5. deviceA call SetRemotePushFinishedNotify set null to unregist
746 * @tc.expected: step5. set should return OK.
747 */
748 status = g_kvDelegatePtr->SetRemotePushFinishedNotify(nullptr);
749 ASSERT_EQ(status, OK);
750 }
751
752 namespace {
RegOnDispatchWithDelayAck(bool & errCodeAck,bool & afterErrAck)753 void RegOnDispatchWithDelayAck(bool &errCodeAck, bool &afterErrAck)
754 {
755 // just delay the busy ack
756 g_communicatorAggregator->RegOnDispatch([&errCodeAck, &afterErrAck](const std::string &dev, Message *inMsg) {
757 if (dev != g_deviceB->GetDeviceId()) {
758 return;
759 }
760 auto *packet = inMsg->GetObject<DataAckPacket>();
761 if (packet != nullptr && packet->GetRecvCode() == -E_BUSY) {
762 errCodeAck = true;
763 while (!afterErrAck) {
764 }
765 LOGW("NOW SEND BUSY ACK");
766 } else if (errCodeAck) {
767 afterErrAck = true;
768 std::this_thread::sleep_for(std::chrono::seconds(1));
769 }
770 });
771 }
772
RegOnDispatchWithOffline(bool & offlineFlag,bool & invalid,condition_variable & conditionOffline)773 void RegOnDispatchWithOffline(bool &offlineFlag, bool &invalid, condition_variable &conditionOffline)
774 {
775 g_communicatorAggregator->RegOnDispatch([&offlineFlag, &invalid, &conditionOffline](
776 const std::string &dev, Message *inMsg) {
777 auto *packet = inMsg->GetObject<DataAckPacket>();
778 if (dev != DEVICE_B) {
779 if (packet != nullptr && (packet->GetRecvCode() == LOCAL_WATER_MARK_NOT_INIT)) {
780 offlineFlag = true;
781 conditionOffline.notify_all();
782 LOGW("[Dispatch] NOTIFY OFFLINE");
783 std::this_thread::sleep_for(std::chrono::microseconds(EIGHT_HUNDRED));
784 }
785 } else if (!invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
786 LOGW("[Dispatch] NOW INVALID THIS MSG");
787 inMsg->SetMessageType(TYPE_INVALID);
788 inMsg->SetMessageId(INVALID_MESSAGE_ID);
789 invalid = true;
790 }
791 });
792 }
793
RegOnDispatchWithInvalidMsg(bool & invalid)794 void RegOnDispatchWithInvalidMsg(bool &invalid)
795 {
796 g_communicatorAggregator->RegOnDispatch([&invalid](
797 const std::string &dev, Message *inMsg) {
798 if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
799 LOGW("[Dispatch] NOW INVALID THIS MSG");
800 inMsg->SetMessageType(TYPE_INVALID);
801 inMsg->SetMessageId(INVALID_MESSAGE_ID);
802 invalid = true;
803 }
804 });
805 }
806
PrepareEnv(vector<std::string> & devices,Key & key,Query & query)807 void PrepareEnv(vector<std::string> &devices, Key &key, Query &query)
808 {
809 /**
810 * @tc.steps: step1. ensure the watermark is no zero and finish timeSync and abilitySync
811 * @tc.expected: step1. should return OK.
812 */
813 Value value = {'1'};
814 std::map<std::string, DBStatus> result;
815 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
816
817 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
818 EXPECT_TRUE(status == OK);
819 ASSERT_TRUE(result[g_deviceB->GetDeviceId()] == OK);
820 }
821
Sync(KvStoreNbDelegate * kvDelegatePtr,vector<std::string> & devices,const DBStatus & targetStatus)822 void Sync(KvStoreNbDelegate *kvDelegatePtr, vector<std::string> &devices, const DBStatus &targetStatus)
823 {
824 std::map<std::string, DBStatus> result;
825 DBStatus status = g_tool.SyncTest(kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result);
826 EXPECT_TRUE(status == OK);
827 for (const auto &deviceId : devices) {
828 ASSERT_TRUE(result[deviceId] == targetStatus);
829 }
830 }
831
Sync(vector<std::string> & devices,const DBStatus & targetStatus)832 void Sync(vector<std::string> &devices, const DBStatus &targetStatus)
833 {
834 Sync(g_kvDelegatePtr, devices, targetStatus);
835 }
836
SyncWithQuery(vector<std::string> & devices,const Query & query,const SyncMode & mode,const DBStatus & targetStatus)837 void SyncWithQuery(vector<std::string> &devices, const Query &query, const SyncMode &mode,
838 const DBStatus &targetStatus)
839 {
840 std::map<std::string, DBStatus> result;
841 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result, query);
842 EXPECT_TRUE(status == OK);
843 for (const auto &deviceId : devices) {
844 ASSERT_EQ(result[deviceId], targetStatus);
845 }
846 }
847
SyncWithQuery(vector<std::string> & devices,const Query & query,const DBStatus & targetStatus)848 void SyncWithQuery(vector<std::string> &devices, const Query &query, const DBStatus &targetStatus)
849 {
850 SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PUSH_ONLY, targetStatus);
851 }
852
SyncWithDeviceOffline(vector<std::string> & devices,Key & key,const Query & query)853 void SyncWithDeviceOffline(vector<std::string> &devices, Key &key, const Query &query)
854 {
855 Value value = {'2'};
856 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
857
858 /**
859 * @tc.steps: step2. invalid the sync msg
860 * @tc.expected: step2. should return TIME_OUT.
861 */
862 SyncWithQuery(devices, query, TIME_OUT);
863
864 /**
865 * @tc.steps: step3. device offline when sync
866 * @tc.expected: step3. should return COMM_FAILURE.
867 */
868 SyncWithQuery(devices, query, COMM_FAILURE);
869 }
870
PrepareWaterMarkError(std::vector<std::string> & devices,Query & query)871 void PrepareWaterMarkError(std::vector<std::string> &devices, Query &query)
872 {
873 /**
874 * @tc.steps: step1. prepare data
875 */
876 devices.push_back(g_deviceB->GetDeviceId());
877 g_deviceB->Online();
878
879 Key key = {'1'};
880 query = Query::Select().PrefixKey(key);
881 PrepareEnv(devices, key, query);
882
883 /**
884 * @tc.steps: step2. query sync and set queryWaterMark
885 * @tc.expected: step2. should return OK.
886 */
887 Value value = {'2'};
888 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
889 SyncWithQuery(devices, query, OK);
890
891 /**
892 * @tc.steps: step3. sync and invalid msg for set local device waterMark
893 * @tc.expected: step3. should return TIME_OUT.
894 */
895 bool invalidMsg = false;
896 RegOnDispatchWithInvalidMsg(invalidMsg);
897 value = {'3'};
898 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
899 Sync(devices, TIME_OUT);
900 g_communicatorAggregator->RegOnDispatch(nullptr);
901 }
902
RegOnDispatchWithoutDataPacket(std::atomic<int> & messageCount,bool calResponse=false)903 void RegOnDispatchWithoutDataPacket(std::atomic<int> &messageCount, bool calResponse = false)
904 {
905 g_communicatorAggregator->RegOnDispatch([calResponse, &messageCount](const std::string &dev, Message *msg) {
906 if (msg->GetMessageId() != TIME_SYNC_MESSAGE && msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
907 return;
908 }
909 if (dev != DEVICE_B || (!calResponse && msg->GetMessageType() != TYPE_REQUEST)) {
910 return;
911 }
912 messageCount++;
913 });
914 }
915
ReOpenDB()916 void ReOpenDB()
917 {
918 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
919 g_kvDelegatePtr = nullptr;
920 KvStoreNbDelegate::Option option;
921 option.secOption.securityLabel = SecurityLabel::S3;
922 option.secOption.securityFlag = SecurityFlag::SECE;
923 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
924 ASSERT_TRUE(g_kvDelegateStatus == OK);
925 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
926 }
927 }
928
929 /**
930 * @tc.name: AckSessionCheck 001
931 * @tc.desc: Test ack session check function.
932 * @tc.type: FUNC
933 * @tc.require: AR000F3OOV
934 * @tc.author: zhangqiquan
935 */
936 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSessionCheck001, TestSize.Level3)
937 {
938 std::vector<std::string> devices;
939 devices.push_back(g_deviceB->GetDeviceId());
940
941 /**
942 * @tc.steps: step1. deviceB sync to deviceA just for timeSync and abilitySync
943 * @tc.expected: step1. should return OK.
944 */
945 ASSERT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
946
947 /**
948 * @tc.steps: step2. deviceA StartTransaction for prevent other sync action deviceB sync will fail
949 * @tc.expected: step2. should return OK.
950 */
951 ASSERT_TRUE(g_kvDelegatePtr->StartTransaction() == OK);
952
953 bool errCodeAck = false;
954 bool afterErrAck = false;
955 RegOnDispatchWithDelayAck(errCodeAck, afterErrAck);
956
957 Key key = {'1'};
958 Value value = {'1'};
959 Timestamp currentTime;
960 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
961 EXPECT_TRUE(g_deviceB->PutData(key, value, currentTime, 0) == E_OK);
962 EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
963
964 Value outValue;
965 EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == NOT_FOUND);
966
967 /**
968 * @tc.steps: step3. release the writeHandle and try again, sync success
969 * @tc.expected: step3. should return OK.
970 */
971 EXPECT_TRUE(g_kvDelegatePtr->Commit() == OK);
972 EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
973
974 EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == E_OK);
975 EXPECT_EQ(outValue, value);
976 }
977
978 /**
979 * @tc.name: AckSafeCheck001
980 * @tc.desc: Test ack session check filter all bad ack in device offline scene.
981 * @tc.type: FUNC
982 * @tc.require: AR000F3OOV
983 * @tc.author: zhangqiquan
984 */
985 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSafeCheck001, TestSize.Level3)
986 {
987 std::vector<std::string> devices;
988 devices.push_back(g_deviceB->GetDeviceId());
989 g_deviceB->Online();
990
991 Key key = {'1'};
992 Query query = Query::Select().PrefixKey(key);
993 PrepareEnv(devices, key, query);
994
995 std::condition_variable conditionOnline;
996 std::condition_variable conditionOffline;
997 bool onlineFlag = false;
998 bool invalid = false;
999 bool offlineFlag = false;
__anon728505271602() 1000 thread subThread([&onlineFlag, &conditionOnline, &offlineFlag, &conditionOffline]() {
1001 LOGW("[Dispatch] NOW DEVICES IS OFFLINE");
1002 std::mutex offlineMtx;
1003 std::unique_lock<std::mutex> lck(offlineMtx);
1004 conditionOffline.wait(lck, [&offlineFlag]{ return offlineFlag; });
1005 g_deviceB->Offline();
1006 std::this_thread::sleep_for(std::chrono::seconds(1));
1007 g_deviceB->Online();
1008 onlineFlag = true;
1009 conditionOnline.notify_all();
1010 LOGW("[Dispatch] NOW DEVICES IS ONLINE");
1011 });
1012 subThread.detach();
1013
1014 RegOnDispatchWithOffline(offlineFlag, invalid, conditionOffline);
1015
1016 SyncWithDeviceOffline(devices, key, query);
1017
1018 std::mutex onlineMtx;
1019 std::unique_lock<std::mutex> lck(onlineMtx);
__anon728505271802null1020 conditionOnline.wait(lck, [&onlineFlag]{ return onlineFlag; });
1021
1022 /**
1023 * @tc.steps: step4. sync again if has problem it will sync never end
1024 * @tc.expected: step4. should return OK.
1025 */
1026 SyncWithQuery(devices, query, OK);
1027 }
1028
1029 /**
1030 * @tc.name: WaterMarkCheck001
1031 * @tc.desc: Test waterMark work correct in lost package scene.
1032 * @tc.type: FUNC
1033 * @tc.require: AR000F3OOV
1034 * @tc.author: zhangqiquan
1035 */
1036 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck001, TestSize.Level1)
1037 {
1038 std::vector<std::string> devices;
1039 Query query = Query::Select();
1040 PrepareWaterMarkError(devices, query);
1041
1042 /**
1043 * @tc.steps: step4. sync again see it work correct
1044 * @tc.expected: step4. should return OK.
1045 */
1046 SyncWithQuery(devices, query, OK);
1047 }
1048
1049 /**
1050 * @tc.name: WaterMarkCheck002
1051 * @tc.desc: Test pull work correct in error waterMark scene.
1052 * @tc.type: FUNC
1053 * @tc.require: AR000F3OOV
1054 * @tc.author: zhangqiquan
1055 */
1056 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck002, TestSize.Level1)
1057 {
1058 std::vector<std::string> devices;
1059 Query query = Query::Select();
1060 PrepareWaterMarkError(devices, query);
1061
1062 /**
1063 * @tc.steps: step4. sync again see it work correct
1064 * @tc.expected: step4. should return OK.
1065 */
1066 Key key = {'2'};
1067 ASSERT_TRUE(g_kvDelegatePtr->Put(key, {}) == OK);
1068 query = Query::Select();
1069 SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PULL_ONLY, OK);
1070
1071 VirtualDataItem item;
1072 EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
1073 }
1074
RegOnDispatchToGetSyncCount(int & sendRequestCount,int sleepMs=0)1075 void RegOnDispatchToGetSyncCount(int &sendRequestCount, int sleepMs = 0)
1076 {
1077 g_communicatorAggregator->RegOnDispatch([sleepMs, &sendRequestCount](
1078 const std::string &dev, Message *inMsg) {
1079 if (dev == DEVICE_B && inMsg->GetMessageType() == TYPE_REQUEST) {
1080 std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1081 sendRequestCount++;
1082 LOGD("sendRequestCount++...");
1083 }
1084 });
1085 }
1086
TestDifferentSyncMode(SyncMode mode)1087 void TestDifferentSyncMode(SyncMode mode)
1088 {
1089 std::vector<std::string> devices;
1090 devices.push_back(g_deviceB->GetDeviceId());
1091
1092 /**
1093 * @tc.steps: step1. deviceA put {k1, v1}
1094 */
1095 Key key = {'1'};
1096 Value value = {'1'};
1097 DBStatus status = g_kvDelegatePtr->Put(key, value);
1098 ASSERT_TRUE(status == OK);
1099
1100 int sendRequestCount = 0;
1101 RegOnDispatchToGetSyncCount(sendRequestCount);
1102
1103 /**
1104 * @tc.steps: step2. deviceA call sync and wait
1105 * @tc.expected: step2. sync should return OK.
1106 */
1107 std::map<std::string, DBStatus> result;
1108 status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result);
1109 ASSERT_TRUE(status == OK);
1110
1111 /**
1112 * @tc.expected: step2. onComplete should be called, DeviceB have {k1,v1}, send request message 3 times
1113 */
1114 ASSERT_TRUE(result.size() == devices.size());
1115 for (const auto &pair : result) {
1116 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1117 EXPECT_TRUE(pair.second == OK);
1118 }
1119 VirtualDataItem item;
1120 g_deviceB->GetData(key, item);
1121 EXPECT_TRUE(item.value == value);
1122
1123 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1124
1125 /**
1126 * @tc.steps: step3. reset sendRequestCount to 0, deviceA call sync and wait again without any change in db
1127 * @tc.expected: step3. sync should return OK, and sendRequestCount should be 1, because this merge can not
1128 * be skipped
1129 */
1130 sendRequestCount = 0;
1131 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1132 ASSERT_TRUE(status == OK);
1133 EXPECT_EQ(sendRequestCount, 1);
1134 }
1135
1136 /**
1137 * @tc.name: PushSyncMergeCheck001
1138 * @tc.desc: Test push sync task merge, task can not be merged when the two sync task is not in the queue
1139 * at the same time.
1140 * @tc.type: FUNC
1141 * @tc.require: AR000F3OOV
1142 * @tc.author: zhangshijie
1143 */
1144 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck001, TestSize.Level1)
1145 {
1146 TestDifferentSyncMode(SYNC_MODE_PUSH_ONLY);
1147 }
1148
1149 /**
1150 * @tc.name: PushSyncMergeCheck002
1151 * @tc.desc: Test push_pull sync task merge, task can not be merged when the two sync task is not in the queue
1152 * at the same time.
1153 * @tc.type: FUNC
1154 * @tc.require: AR000F3OOV
1155 * @tc.author: zhangshijie
1156 */
1157 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck002, TestSize.Level1)
1158 {
1159 TestDifferentSyncMode(SYNC_MODE_PUSH_PULL);
1160 }
1161
PrepareForSyncMergeTest(std::vector<std::string> & devices,int & sendRequestCount)1162 void PrepareForSyncMergeTest(std::vector<std::string> &devices, int &sendRequestCount)
1163 {
1164 /**
1165 * @tc.steps: step1. deviceA put {k1, v1}
1166 */
1167 Key key = {'1'};
1168 Value value = {'1'};
1169 DBStatus status = g_kvDelegatePtr->Put(key, value);
1170 ASSERT_TRUE(status == OK);
1171
1172 RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1173
1174 /**
1175 * @tc.steps: step2. deviceA call sync and don't wait
1176 * @tc.expected: step2. sync should return OK.
1177 */
1178 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
1179 [&sendRequestCount, devices, key, value](const std::map<std::string, DBStatus>& statusMap) {
1180 ASSERT_TRUE(statusMap.size() == devices.size());
1181 for (const auto &pair : statusMap) {
1182 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1183 EXPECT_TRUE(pair.second == OK);
1184 }
1185 VirtualDataItem item;
1186 g_deviceB->GetData(key, item);
1187 EXPECT_EQ(item.value, value);
1188 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1189
1190 // reset sendRequestCount to 0
1191 sendRequestCount = 0;
1192 });
1193 ASSERT_TRUE(status == OK);
1194 }
1195
1196 /**
1197 * @tc.name: PushSyncMergeCheck003
1198 * @tc.desc: Test push sync task merge, task can not be merged when there is change in db since last push sync
1199 * @tc.type: FUNC
1200 * @tc.require: AR000F3OOV
1201 * @tc.author: zhangshijie
1202 */
1203 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck003, TestSize.Level3)
1204 {
1205 DBStatus status = OK;
1206 std::vector<std::string> devices;
1207 devices.push_back(g_deviceB->GetDeviceId());
1208
1209 int sendRequestCount = 0;
1210 PrepareForSyncMergeTest(devices, sendRequestCount);
1211
1212 /**
1213 * @tc.steps: step3. deviceA call sync and don't wait
1214 * @tc.expected: step3. sync should return OK.
1215 */
1216 Key key = {'1'};
1217 Value value = {'2'};
1218 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505271b02(const std::map<std::string, DBStatus>& statusMap) 1219 [&sendRequestCount, devices, key, value, this](const std::map<std::string, DBStatus>& statusMap) {
1220 /**
1221 * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1222 * skipped, but it is no need to do time sync and ability sync, only need to do data sync
1223 */
1224 ASSERT_TRUE(statusMap.size() == devices.size());
1225 for (const auto &pair : statusMap) {
1226 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1227 EXPECT_TRUE(pair.second == OK);
1228 }
1229 VirtualDataItem item;
1230 g_deviceB->GetData(key, item);
1231 EXPECT_EQ(item.value, value);
1232 });
1233 ASSERT_TRUE(status == OK);
1234
1235 /**
1236 * @tc.steps: step4. deviceA put {k1, v2}
1237 */
1238 while (sendRequestCount < TWO_CNT) {
1239 std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1240 }
1241 status = g_kvDelegatePtr->Put(key, value);
1242 ASSERT_TRUE(status == OK);
1243 // wait for the second sync task finish
1244 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1245 EXPECT_EQ(sendRequestCount, 1);
1246 }
1247
1248 /**
1249 * @tc.name: PushSyncMergeCheck004
1250 * @tc.desc: Test push sync task merge, task can be merged when there is no change in db since last push sync
1251 * @tc.type: FUNC
1252 * @tc.require: AR000F3OOV
1253 * @tc.author: zhangshijie
1254 */
1255 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck004, TestSize.Level3)
1256 {
1257 DBStatus status = OK;
1258 std::vector<std::string> devices;
1259 devices.push_back(g_deviceB->GetDeviceId());
1260
1261 int sendRequestCount = 0;
1262 PrepareForSyncMergeTest(devices, sendRequestCount);
1263
1264 /**
1265 * @tc.steps: step3. deviceA call sync and don't wait
1266 * @tc.expected: step3. sync should return OK.
1267 */
1268 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505271c02(const std::map<std::string, DBStatus>& statusMap) 1269 [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1270 /**
1271 * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can be
1272 * skipped
1273 */
1274 ASSERT_TRUE(statusMap.size() == devices.size());
1275 for (const auto &pair : statusMap) {
1276 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1277 EXPECT_TRUE(pair.second == OK);
1278 }
1279 });
1280 ASSERT_TRUE(status == OK);
1281 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1282 EXPECT_EQ(sendRequestCount, 0);
1283 }
1284
RegOnDispatchWithInvalidMsgAndCnt(int & sendRequestCount,int sleepMs,bool & invalid)1285 void RegOnDispatchWithInvalidMsgAndCnt(int &sendRequestCount, int sleepMs, bool &invalid)
1286 {
1287 g_communicatorAggregator->RegOnDispatch([&sendRequestCount, sleepMs, &invalid](
1288 const std::string &dev, Message *inMsg) {
1289 if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
1290 inMsg->SetMessageType(TYPE_INVALID);
1291 inMsg->SetMessageId(INVALID_MESSAGE_ID);
1292 sendRequestCount++;
1293 invalid = true;
1294 LOGW("[Dispatch]invalid THIS MSG, sendRequestCount = %d", sendRequestCount);
1295 std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1296 }
1297 });
1298 }
1299
1300 /**
1301 * @tc.name: PushSyncMergeCheck005
1302 * @tc.desc: Test push sync task merge, task cannot be merged when the last push sync is failed
1303 * @tc.type: FUNC
1304 * @tc.require: AR000F3OOV
1305 * @tc.author: zhangshijie
1306 */
1307 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck005, TestSize.Level3)
1308 {
1309 DBStatus status = OK;
1310 std::vector<std::string> devices;
1311 devices.push_back(g_deviceB->GetDeviceId());
1312
1313 /**
1314 * @tc.steps: step1. deviceA put {k1, v1}
1315 */
1316 Key key = {'1'};
1317 Value value = {'1'};
1318 status = g_kvDelegatePtr->Put(key, value);
1319 ASSERT_TRUE(status == OK);
1320
1321 int sendRequestCount = 0;
1322 bool invalid = false;
1323 RegOnDispatchWithInvalidMsgAndCnt(sendRequestCount, SLEEP_MILLISECONDS, invalid);
1324
1325 /**
1326 * @tc.steps: step2. deviceA call sync and don't wait
1327 * @tc.expected: step2. sync should return TIME_OUT.
1328 */
1329 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505271e02(const std::map<std::string, DBStatus>& statusMap) 1330 [&sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1331 ASSERT_TRUE(statusMap.size() == devices.size());
1332 for (const auto &deviceId : devices) {
1333 ASSERT_EQ(statusMap.at(deviceId), TIME_OUT);
1334 }
1335 });
1336 EXPECT_TRUE(status == OK);
1337
1338 /**
1339 * @tc.steps: step3. deviceA call sync and don't wait
1340 * @tc.expected: step3. sync should return OK.
1341 */
1342 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505271f02(const std::map<std::string, DBStatus>& statusMap) 1343 [key, value, &sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1344 /**
1345 * @tc.expected: when the second sync task return, sendRequestCount should be 3, because this merge can not be
1346 * skipped, deviceB should have {k1, v1}.
1347 */
1348 ASSERT_TRUE(statusMap.size() == devices.size());
1349 for (const auto &pair : statusMap) {
1350 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1351 EXPECT_EQ(pair.second, OK);
1352 }
1353 VirtualDataItem item;
1354 g_deviceB->GetData(key, item);
1355 EXPECT_EQ(item.value, value);
1356 });
1357 ASSERT_TRUE(status == OK);
1358 while (sendRequestCount < 1) {
1359 std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1360 }
1361 sendRequestCount = 0;
1362 RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1363
1364 // wait for the second sync task finish
1365 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1366 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1367 }
1368
PrePareForQuerySyncMergeTest(bool isQuerySync,std::vector<std::string> & devices,Key & key,Value & value,int & sendRequestCount)1369 void PrePareForQuerySyncMergeTest(bool isQuerySync, std::vector<std::string> &devices,
1370 Key &key, Value &value, int &sendRequestCount)
1371 {
1372 DBStatus status = OK;
1373 /**
1374 * @tc.steps: step1. deviceA put {k1, v1}...{k10, v10}
1375 */
1376 Query query = Query::Select().PrefixKey(key);
1377 const int dataSize = 10;
1378 for (int i = 0; i < dataSize; i++) {
1379 key.push_back(i);
1380 value.push_back(i);
1381 status = g_kvDelegatePtr->Put(key, value);
1382 ASSERT_TRUE(status == OK);
1383 key.pop_back();
1384 value.pop_back();
1385 }
1386
1387 RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1388 /**
1389 * @tc.steps: step2. deviceA call query sync and don't wait
1390 * @tc.expected: step2. sync should return OK.
1391 */
1392 auto completeCallBack = [&sendRequestCount, &key, &value, dataSize, devices]
1393 (const std::map<std::string, DBStatus>& statusMap) {
1394 ASSERT_TRUE(statusMap.size() == devices.size());
1395 for (const auto &pair : statusMap) {
1396 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1397 EXPECT_EQ(pair.second, OK);
1398 }
1399 // when first sync finish, DeviceB have {k1,v1}, {k3,v3}, {k5,v5} .. send request message 3 times
1400 VirtualDataItem item;
1401 for (int i = 0; i < dataSize; i++) {
1402 key.push_back(i);
1403 value.push_back(i);
1404 g_deviceB->GetData(key, item);
1405 EXPECT_EQ(item.value, value);
1406 key.pop_back();
1407 value.pop_back();
1408 }
1409 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1410 // reset sendRequestCount to 0
1411 sendRequestCount = 0;
1412 };
1413 if (isQuerySync) {
1414 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack, query, false);
1415 } else {
1416 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack);
1417 }
1418 ASSERT_TRUE(status == OK);
1419 }
1420
1421 /**
1422 * @tc.name: QuerySyncMergeCheck001
1423 * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last query sync
1424 * @tc.type: FUNC
1425 * @tc.require: AR000F3OOV
1426 * @tc.author: zhangshijie
1427 */
1428 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck001, TestSize.Level3)
1429 {
1430 std::vector<std::string> devices;
1431 int sendRequestCount = 0;
1432 devices.push_back(g_deviceB->GetDeviceId());
1433
1434 Key key {'1'};
1435 Value value {'1'};
1436 Query query = Query::Select().PrefixKey(key);
1437 PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1438
1439 /**
1440 * @tc.steps: step3. deviceA call query sync and don't wait
1441 * @tc.expected: step3. sync should return OK.
1442 */
1443 DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505272102(const std::map<std::string, DBStatus>& statusMap) 1444 [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1445 /**
1446 * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can be
1447 * skipped because there is no change in db since last query sync
1448 */
1449 ASSERT_TRUE(statusMap.size() == devices.size());
1450 for (const auto &pair : statusMap) {
1451 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1452 EXPECT_TRUE(pair.second == OK);
1453 }
1454 }, query, false);
1455 ASSERT_TRUE(status == OK);
1456 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1457 EXPECT_EQ(sendRequestCount, 0);
1458 }
1459
1460 /**
1461 * @tc.name: QuerySyncMergeCheck002
1462 * @tc.desc: Test query push sync task merge, task can not be merged when there is change in db since last sync
1463 * @tc.type: FUNC
1464 * @tc.require: AR000F3OOV
1465 * @tc.author: zhangshijie
1466 */
1467 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck002, TestSize.Level3)
1468 {
1469 std::vector<std::string> devices;
1470 int sendRequestCount = 0;
1471 devices.push_back(g_deviceB->GetDeviceId());
1472
1473 Key key {'1'};
1474 Value value {'1'};
1475 Query query = Query::Select().PrefixKey(key);
1476 PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1477
1478 /**
1479 * @tc.steps: step3. deviceA call query sync and don't wait
1480 * @tc.expected: step3. sync should return OK.
1481 */
1482 Value value3{'3'};
1483 DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505272202(const std::map<std::string, DBStatus>& statusMap) 1484 [&sendRequestCount, devices, key, value3, this](const std::map<std::string, DBStatus>& statusMap) {
1485 /**
1486 * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1487 * skipped when there is change in db since last query sync, deviceB have {k1, v1'}
1488 */
1489 ASSERT_TRUE(statusMap.size() == devices.size());
1490 for (const auto &pair : statusMap) {
1491 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1492 EXPECT_TRUE(pair.second == OK);
1493 }
1494 VirtualDataItem item;
1495 g_deviceB->GetData(key, item);
1496 EXPECT_TRUE(item.value == value3);
1497 EXPECT_EQ(sendRequestCount, 1);
1498 }, query, false);
1499 ASSERT_TRUE(status == OK);
1500
1501 /**
1502 * @tc.steps: step4. deviceA put {k1, v1'}
1503 * @tc.steps: step4. reset sendRequestCount to 0, deviceA call sync and wait
1504 * @tc.expected: step4. sync should return OK, and sendRequestCount should be 1, because this merge can not
1505 * be skipped
1506 */
1507 while (sendRequestCount < TWO_CNT) {
1508 std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1509 }
1510 g_kvDelegatePtr->Put(key, value3);
1511 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1512 }
1513
1514 /**
1515 * @tc.name: QuerySyncMergeCheck003
1516 * @tc.desc: Test query push sync task merge, task can not be merged when then query id is different
1517 * @tc.type: FUNC
1518 * @tc.require: AR000F3OOV
1519 * @tc.author: zhangshijie
1520 */
1521 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck003, TestSize.Level3)
1522 {
1523 std::vector<std::string> devices;
1524 int sendRequestCount = 0;
1525 devices.push_back(g_deviceB->GetDeviceId());
1526
1527 Key key {'1'};
1528 Value value {'1'};
1529 PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1530
1531 /**
1532 * @tc.steps: step3. deviceA call another query sync
1533 * @tc.expected: step3. sync should return OK.
1534 */
1535 Key key2 = {'2'};
1536 Value value2 = {'2'};
1537 DBStatus status = g_kvDelegatePtr->Put(key2, value2);
1538 ASSERT_TRUE(status == OK);
1539 Query query2 = Query::Select().PrefixKey(key2);
1540 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505272302(const std::map<std::string, DBStatus>& statusMap) 1541 [&sendRequestCount, key2, value2, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1542 /**
1543 * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1544 * skipped, deviceB have {k2,v2}
1545 */
1546 ASSERT_TRUE(statusMap.size() == devices.size());
1547 for (const auto &pair : statusMap) {
1548 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1549 EXPECT_TRUE(pair.second == OK);
1550 }
1551 VirtualDataItem item;
1552 g_deviceB->GetData(key2, item);
1553 EXPECT_TRUE(item.value == value2);
1554 EXPECT_EQ(sendRequestCount, 1);
1555 }, query2, false);
1556 ASSERT_TRUE(status == OK);
1557 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1558 }
1559
1560 /**
1561 * @tc.name: QuerySyncMergeCheck004
1562 * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last push sync
1563 * @tc.type: FUNC
1564 * @tc.require: AR000F3OOV
1565 * @tc.author: zhangshijie
1566 */
1567 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck004, TestSize.Level3)
1568 {
1569 DBStatus status = OK;
1570 std::vector<std::string> devices;
1571 devices.push_back(g_deviceB->GetDeviceId());
1572
1573 Key key {'1'};
1574 Value value {'1'};
1575 int sendRequestCount = 0;
1576 PrePareForQuerySyncMergeTest(false, devices, key, value, sendRequestCount);
1577
1578 /**
1579 * @tc.steps: step3. deviceA call query sync without any change in db
1580 * @tc.expected: step3. sync should return OK, and sendRequestCount should be 0, because this merge can be skipped
1581 */
1582 Query query = Query::Select().PrefixKey(key);
1583 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon728505272402(const std::map<std::string, DBStatus>& statusMap) 1584 [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1585 /**
1586 * @tc.expected step3: when the second sync task return, sendRequestCount should be 0, because this merge
1587 * can be skipped because there is no change in db since last push sync
1588 */
1589 ASSERT_TRUE(statusMap.size() == devices.size());
1590 for (const auto &pair : statusMap) {
1591 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1592 EXPECT_TRUE(pair.second == OK);
1593 }
1594 }, query, false);
1595 ASSERT_TRUE(status == OK);
1596 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1597 EXPECT_EQ(sendRequestCount, 0);
1598 }
1599
1600 /**
1601 * @tc.name: GetDataNotify001
1602 * @tc.desc: Test GetDataNotify function, delay < 30s should sync ok, > 36 should timeout
1603 * @tc.type: FUNC
1604 * @tc.require: AR000D4876
1605 * @tc.author: zhangqiquan
1606 */
1607 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify001, TestSize.Level3)
1608 {
1609 ASSERT_NE(g_kvDelegatePtr, nullptr);
1610 DBStatus status = OK;
1611 std::vector<std::string> devices;
1612 devices.push_back(g_deviceB->GetDeviceId());
1613 const std::string DEVICE_A = "real_device";
1614 /**
1615 * @tc.steps: step1. deviceB set get data delay 40s
1616 */
1617 g_deviceB->DelayGetSyncData(WAIT_40_SECONDS);
1618 g_communicatorAggregator->SetTimeout(DEVICE_A, TIMEOUT_6_SECONDS);
1619
1620 /**
1621 * @tc.steps: step2. deviceA call sync and wait
1622 * @tc.expected: step2. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
1623 */
1624 std::map<std::string, DBStatus> result;
1625 std::map<std::string, int> virtualRes;
1626 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1627 EXPECT_EQ(status, OK);
1628 EXPECT_EQ(result.size(), devices.size());
1629 EXPECT_EQ(result[DEVICE_B], TIME_OUT);
1630 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1631 Query query = Query::Select();
__anon728505272502(std::map<std::string, int> resMap) 1632 g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1633 virtualRes = std::move(resMap);
1634 }, true);
1635 EXPECT_EQ(virtualRes.size(), devices.size());
1636 EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_TIMEOUT));
1637 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1638
1639 /**
1640 * @tc.steps: step3. deviceB set get data delay 30s
1641 */
1642 g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1643 /**
1644 * @tc.steps: step4. deviceA call sync and wait
1645 * @tc.expected: step4. sync should return OK. onComplete should be called, deviceB sync OK.
1646 */
1647 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1648 EXPECT_EQ(status, OK);
1649 EXPECT_EQ(result.size(), devices.size());
1650 EXPECT_EQ(result[DEVICE_B], OK);
1651 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
__anon728505272602(std::map<std::string, int> resMap) 1652 g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1653 virtualRes = std::move(resMap);
1654 }, true);
1655 EXPECT_EQ(virtualRes.size(), devices.size());
1656 EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_FINISHED_ALL));
1657 g_deviceB->DelayGetSyncData(0);
1658 }
1659
1660 /**
1661 * @tc.name: GetDataNotify002
1662 * @tc.desc: Test GetDataNotify function, two device sync each other at same time
1663 * @tc.type: FUNC
1664 * @tc.require: AR000D4876
1665 * @tc.author: zhangqiquan
1666 */
1667 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify002, TestSize.Level3)
1668 {
1669 ASSERT_NE(g_kvDelegatePtr, nullptr);
1670 DBStatus status = OK;
1671 std::vector<std::string> devices;
1672 devices.push_back(g_deviceB->GetDeviceId());
1673 const std::string DEVICE_A = "real_device";
1674
1675 /**
1676 * @tc.steps: step1. deviceA sync first to finish time sync and ability sync
1677 */
1678 std::map<std::string, DBStatus> result;
1679 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1680 EXPECT_EQ(status, OK);
1681 EXPECT_EQ(result.size(), devices.size());
1682 EXPECT_EQ(result[DEVICE_B], OK);
1683 /**
1684 * @tc.steps: step2. deviceB set get data delay 30s
1685 */
1686 g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1687
1688 /**
1689 * @tc.steps: step3. deviceB call sync and wait
1690 */
__anon728505272702() 1691 std::thread asyncThread([]() {
1692 std::map<std::string, int> virtualRes;
1693 Query query = Query::Select();
1694 g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1695 virtualRes = std::move(resMap);
1696 }, true);
1697 });
1698
1699 /**
1700 * @tc.steps: step4. deviceA call sync and wait
1701 * @tc.expected: step4. sync should return OK. because notify timer trigger (30s - 1s)/2s => 15times
1702 */
1703 std::this_thread::sleep_for(std::chrono::seconds(1));
1704 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1705 EXPECT_EQ(status, OK);
1706 EXPECT_EQ(result.size(), devices.size());
1707 EXPECT_EQ(result[DEVICE_B], OK);
1708 asyncThread.join();
1709 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1710 }
1711
1712 /**
1713 * @tc.name: DelaySync001
1714 * @tc.desc: Test delay first packet will not effect data conflict
1715 * @tc.type: FUNC
1716 * @tc.require:
1717 * @tc.author: zqq
1718 */
1719 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, DelaySync001, TestSize.Level3)
1720 {
1721 // B put (k, b) after A put (k, a)
1722 Key key = {'k'};
1723 Value aValue = {'a'};
1724 g_kvDelegatePtr->Put(key, aValue);
1725 std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for data conflict
1726 Timestamp currentTime = TimeHelper::GetSysCurrentTime() + TimeHelper::BASE_OFFSET;
1727 Value bValue = {'b'};
1728 EXPECT_EQ(g_deviceB->PutData(key, bValue, currentTime, 0), E_OK);
1729
1730 // delay time sync message, delay time/2 should greater than put sleep time
1731 g_communicatorAggregator->SetTimeout(DEVICE_B, DBConstant::MAX_TIMEOUT);
1732 g_communicatorAggregator->SetTimeout("real_device", DBConstant::MAX_TIMEOUT);
__anon728505272902(const std::string &dstTarget, const Message *msg) 1733 g_communicatorAggregator->RegBeforeDispatch([](const std::string &dstTarget, const Message *msg) {
1734 if (dstTarget == DEVICE_B && msg->GetMessageId() == MessageId::TIME_SYNC_MESSAGE) {
1735 std::this_thread::sleep_for(std::chrono::seconds(3)); // sleep for 3s
1736 }
1737 });
1738
1739 // A call sync and (k, b) in A
1740 std::vector<std::string> devices;
1741 devices.push_back(g_deviceB->GetDeviceId());
1742 std::map<std::string, DBStatus> result;
1743 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1744 EXPECT_EQ(status, OK);
1745 EXPECT_EQ(result.size(), devices.size());
1746 EXPECT_EQ(result[DEVICE_B], OK);
1747
1748 Value actualValue;
1749 g_kvDelegatePtr->Get(key, actualValue);
1750 EXPECT_EQ(actualValue, bValue);
1751 g_communicatorAggregator->RegBeforeDispatch(nullptr);
1752 }
1753
1754 /**
1755 * @tc.name: KVAbilitySyncOpt001
1756 * @tc.desc: check ability sync 2 packet
1757 * @tc.type: FUNC
1758 * @tc.require:
1759 * @tc.author: zhangqiquan
1760 */
1761 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVAbilitySyncOpt001, TestSize.Level0)
1762 {
1763 /**
1764 * @tc.steps: step1. record packet
1765 * @tc.expected: step1. sync should failed in source.
1766 */
1767 std::atomic<int> messageCount = 0;
__anon728505272a02(const std::string &dev, Message *msg) 1768 g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &dev, Message *msg) {
1769 if (msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
1770 return;
1771 }
1772 messageCount++;
1773 EXPECT_GE(g_kvDelegatePtr->GetTaskCount(), 1);
1774 });
1775 /**
1776 * @tc.steps: step2. deviceA call sync and wait
1777 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
1778 */
1779 DBStatus status = OK;
1780 std::vector<std::string> devices;
1781 devices.push_back(g_deviceB->GetDeviceId());
1782 std::map<std::string, DBStatus> result;
1783 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1784 EXPECT_EQ(status, OK);
1785 EXPECT_EQ(messageCount, 2); // 2 ability sync
1786 for (const auto &pair : result) {
1787 EXPECT_EQ(pair.second, OK);
1788 }
1789 }
1790
1791 /**
1792 * @tc.name: KVAbilitySyncOpt002
1793 * @tc.desc: check get task count while conn is nullptr.
1794 * @tc.type: FUNC
1795 * @tc.require:
1796 * @tc.author: caihaoting
1797 */
1798 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVAbilitySyncOpt002, TestSize.Level0)
1799 {
1800 /**
1801 * @tc.steps: step1. record packet while conn is nullptr.
1802 * @tc.expected: step1. sync should failed in source and get task count return DB_ERROR.
1803 */
1804 auto kvStoreImpl = static_cast<KvStoreNbDelegateImpl *>(g_kvDelegatePtr);
1805 EXPECT_EQ(kvStoreImpl->Close(), OK);
1806 std::atomic<int> messageCount = 0;
__anon728505272b02(const std::string &dev, Message *msg) 1807 g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &dev, Message *msg) {
1808 if (msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
1809 return;
1810 }
1811 messageCount++;
1812 EXPECT_EQ(g_kvDelegatePtr->GetTaskCount(), DB_ERROR);
1813 });
1814 }
1815
1816 /**
1817 * @tc.name: KVSyncOpt001
1818 * @tc.desc: check time sync and ability sync once
1819 * @tc.type: FUNC
1820 * @tc.require:
1821 * @tc.author: zhangqiquan
1822 */
1823 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt001, TestSize.Level0)
1824 {
1825 /**
1826 * @tc.steps: step1. record packet which send to B
1827 */
1828 std::atomic<int> messageCount = 0;
1829 RegOnDispatchWithoutDataPacket(messageCount);
1830 /**
1831 * @tc.steps: step2. deviceA call sync and wait
1832 * @tc.expected: step2. sync should return OK.
1833 */
1834 std::vector<std::string> devices;
1835 devices.push_back(g_deviceB->GetDeviceId());
1836 Sync(devices, OK);
1837 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
1838 /**
1839 * @tc.steps: step3. reopen kv store
1840 * @tc.expected: step3. reopen OK.
1841 */
1842 ReOpenDB();
1843 /**
1844 * @tc.steps: step4. reopen kv store and sync again
1845 * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
1846 */
1847 messageCount = 0;
1848 Sync(devices, OK);
1849 EXPECT_EQ(messageCount, 0);
1850 g_communicatorAggregator->RegOnDispatch(nullptr);
1851 }
1852
1853 /**
1854 * @tc.name: KVSyncOpt002
1855 * @tc.desc: check device time sync once
1856 * @tc.type: FUNC
1857 * @tc.require:
1858 * @tc.author: zhangqiquan
1859 */
1860 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt002, TestSize.Level0)
1861 {
1862 /**
1863 * @tc.steps: step1. record packet which send to B
1864 */
1865 std::atomic<int> messageCount = 0;
1866 RegOnDispatchWithoutDataPacket(messageCount);
1867 /**
1868 * @tc.steps: step2. deviceA call sync and wait
1869 * @tc.expected: step2. sync should return OK.
1870 */
1871 std::vector<std::string> devices;
1872 devices.push_back(g_deviceB->GetDeviceId());
1873 Sync(devices, OK);
1874 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
1875 // close kv store avoid packet dispatch error
1876 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1877 g_kvDelegatePtr = nullptr;
1878 ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID), OK);
1879 EXPECT_TRUE(RuntimeContext::GetInstance()->IsTimeTickMonitorValid());
1880 /**
1881 * @tc.steps: step3. open new kv store
1882 * @tc.expected: step3. open OK.
1883 */
1884 KvStoreNbDelegate::Option option;
1885 option.secOption.securityLabel = SecurityLabel::S3;
1886 option.secOption.securityFlag = SecurityFlag::SECE;
1887 KvStoreNbDelegate *delegate2 = nullptr;
__anon728505272c02(DBStatus status, KvStoreNbDelegate *delegate) 1888 g_mgr.GetKvStore(STORE_ID_2, option, [&delegate2](DBStatus status, KvStoreNbDelegate *delegate) {
1889 delegate2 = delegate;
1890 EXPECT_EQ(status, OK);
1891 });
1892 /**
1893 * @tc.steps: step4. sync again
1894 * @tc.expected: step4. sync success, only ability sync packet.
1895 */
1896 messageCount = 0;
1897 Sync(delegate2, devices, OK);
1898 EXPECT_EQ(messageCount, 1); // 1 contain ability sync packet
1899 EXPECT_EQ(g_mgr.CloseKvStore(delegate2), OK);
1900 EXPECT_EQ(g_mgr.DeleteKvStore(STORE_ID_2), OK);
1901 g_communicatorAggregator->RegOnDispatch(nullptr);
1902 }
1903
1904 /**
1905 * @tc.name: KVSyncOpt003
1906 * @tc.desc: check time sync and ability sync once
1907 * @tc.type: FUNC
1908 * @tc.require:
1909 * @tc.author: zhangqiquan
1910 */
1911 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt003, TestSize.Level0)
1912 {
1913 /**
1914 * @tc.steps: step1. record packet which send to B
1915 */
1916 std::atomic<int> messageCount = 0;
1917 RegOnDispatchWithoutDataPacket(messageCount);
1918 /**
1919 * @tc.steps: step2. deviceA call sync and wait
1920 * @tc.expected: step2. sync should return OK.
1921 */
1922 std::vector<std::string> devices;
1923 devices.push_back(g_deviceB->GetDeviceId());
1924 Sync(devices, OK);
1925 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
1926 /**
1927 * @tc.steps: step3. reopen kv store
1928 * @tc.expected: step3. reopen OK.
1929 */
1930 ReOpenDB();
1931 /**
1932 * @tc.steps: step4. reopen kv store and sync again
1933 * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
1934 */
1935 messageCount = 0;
1936 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
1937 EXPECT_EQ(messageCount, 0);
1938 g_communicatorAggregator->RegOnDispatch(nullptr);
1939 }
1940
1941 /**
1942 * @tc.name: KVSyncOpt004
1943 * @tc.desc: check sync in keys after reopen db
1944 * @tc.type: FUNC
1945 * @tc.require:
1946 * @tc.author: zhangqiquan
1947 */
1948 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt004, TestSize.Level0)
1949 {
1950 /**
1951 * @tc.steps: step1. deviceA call sync and wait
1952 * @tc.expected: step1. sync should return OK.
1953 */
1954 std::vector<std::string> devices;
1955 devices.push_back(g_deviceB->GetDeviceId());
1956 Sync(devices, OK);
1957 /**
1958 * @tc.steps: step2. reopen kv store
1959 * @tc.expected: step2. reopen OK.
1960 */
1961 ReOpenDB();
1962 /**
1963 * @tc.steps: step3. sync with in keys
1964 * @tc.expected: step3. sync OK.
1965 */
1966 std::map<std::string, DBStatus> result;
1967 std::set<Key> condition;
1968 condition.insert({'k'});
1969 Query query = Query::Select().InKeys(condition);
1970 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
1971 EXPECT_EQ(status, OK);
1972 for (const auto &deviceId : devices) {
1973 EXPECT_EQ(result[deviceId], OK);
1974 }
1975 }
1976
1977 /**
1978 * @tc.name: KVSyncOpt005
1979 * @tc.desc: check record ability finish after receive ability sync
1980 * @tc.type: FUNC
1981 * @tc.require:
1982 * @tc.author: zhangqiquan
1983 */
1984 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt005, TestSize.Level0)
1985 {
1986 /**
1987 * @tc.steps: step1. record packet which send to B
1988 */
1989 std::atomic<int> messageCount = 0;
1990 RegOnDispatchWithoutDataPacket(messageCount, true);
1991 /**
1992 * @tc.steps: step2. deviceB call sync and wait
1993 * @tc.expected: step2. sync should return OK.
1994 */
1995 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
1996 EXPECT_EQ(messageCount, 2); // DEV_A send negotiation 2 ack packet.
1997 /**
1998 * @tc.steps: step3. reopen kv store
1999 * @tc.expected: step3. reopen OK.
2000 */
2001 ReOpenDB();
2002 /**
2003 * @tc.steps: step4. reopen kv store and sync again
2004 * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2005 */
2006 messageCount = 0;
2007 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2008 EXPECT_EQ(messageCount, 0);
2009 g_communicatorAggregator->RegOnDispatch(nullptr);
2010 }
2011
2012 /**
2013 * @tc.name: KVSyncOpt006
2014 * @tc.desc: check time sync and ability sync once after rebuild
2015 * @tc.type: FUNC
2016 * @tc.require:
2017 * @tc.author: zhangqiquan
2018 */
2019 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt006, TestSize.Level0)
2020 {
2021 /**
2022 * @tc.steps: step1. record packet which send to B
2023 */
2024 std::atomic<int> messageCount = 0;
2025 RegOnDispatchWithoutDataPacket(messageCount, true);
2026 /**
2027 * @tc.steps: step2. deviceA call sync and wait
2028 * @tc.expected: step2. sync should return OK.
2029 */
2030 std::vector<std::string> devices;
2031 devices.push_back(g_deviceB->GetDeviceId());
2032 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2033 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2034 /**
2035 * @tc.steps: step3. rebuild kv store
2036 * @tc.expected: step3. rebuild OK.
2037 */
2038 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
2039 g_kvDelegatePtr = nullptr;
2040 g_mgr.DeleteKvStore(STORE_ID);
2041 KvStoreNbDelegate::Option option;
2042 option.secOption.securityLabel = SecurityLabel::S3;
2043 option.secOption.securityFlag = SecurityFlag::SECE;
2044 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
2045 ASSERT_TRUE(g_kvDelegateStatus == OK);
2046 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2047 /**
2048 * @tc.steps: step4. rebuild kv store and sync again
2049 * @tc.expected: step4. rebuild OK and sync success, re ability sync.
2050 */
2051 messageCount = 0;
2052 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2053 EXPECT_EQ(messageCount, 1);
2054 g_communicatorAggregator->RegOnDispatch(nullptr);
2055 }
2056
2057 /**
2058 * @tc.name: KVSyncOpt007
2059 * @tc.desc: check re ability sync after import
2060 * @tc.type: FUNC
2061 * @tc.require:
2062 * @tc.author: zhangqiquan
2063 */
2064 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt007, TestSize.Level0)
2065 {
2066 /**
2067 * @tc.steps: step1. record packet which send to B
2068 */
2069 std::atomic<int> messageCount = 0;
2070 RegOnDispatchWithoutDataPacket(messageCount, true);
2071 /**
2072 * @tc.steps: step2. deviceB call sync and wait
2073 * @tc.expected: step2. sync should return OK.
2074 */
2075 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2076 EXPECT_EQ(messageCount, 2); // DEV_A send negotiation 2 ack packet.
2077 /**
2078 * @tc.steps: step3. export and import
2079 * @tc.expected: step3. export and import OK.
2080 */
2081 std::string singleExportFileName = g_testDir + "/KVSyncOpt007.$$";
2082 CipherPassword passwd;
2083 EXPECT_EQ(g_kvDelegatePtr->Export(singleExportFileName, passwd), OK);
2084 EXPECT_EQ(g_kvDelegatePtr->Import(singleExportFileName, passwd), OK);
2085 /**
2086 * @tc.steps: step4. reopen kv store and sync again
2087 * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2088 */
2089 messageCount = 0;
2090 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2091 EXPECT_EQ(messageCount, 1); // DEV_A send negotiation 1 ack packet.
2092 g_communicatorAggregator->RegOnDispatch(nullptr);
2093 }
2094
2095 /**
2096 * @tc.name: KVTimeChange001
2097 * @tc.desc: check time sync and ability sync once
2098 * @tc.type: FUNC
2099 * @tc.require:
2100 * @tc.author: zhangqiquan
2101 */
2102 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVTimeChange001, TestSize.Level0)
2103 {
2104 /**
2105 * @tc.steps: step1. record packet which send to B
2106 */
2107 std::atomic<int> messageCount = 0;
2108 RegOnDispatchWithoutDataPacket(messageCount);
2109 /**
2110 * @tc.steps: step2. deviceA call sync and wait
2111 * @tc.expected: step2. sync should return OK.
2112 */
2113 std::vector<std::string> devices;
2114 devices.push_back(g_deviceB->GetDeviceId());
2115 Sync(devices, OK);
2116 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2117 /**
2118 * @tc.steps: step3. sync again
2119 * @tc.expected: step3. sync success, no negotiation packet.
2120 */
2121 messageCount = 0;
2122 Sync(devices, OK);
2123 EXPECT_EQ(messageCount, 0);
2124 /**
2125 * @tc.steps: step4. modify time offset and sync again
2126 * @tc.expected: step4. sync success, only time sync packet.
2127 */
2128 RuntimeContext::GetInstance()->NotifyTimestampChanged(100);
2129 RuntimeContext::GetInstance()->RecordAllTimeChange();
2130 RuntimeContext::GetInstance()->ClearAllDeviceTimeInfo();
2131 messageCount = 0;
2132 Sync(devices, OK);
2133 EXPECT_EQ(messageCount, 1); // 1 contain time sync request packet
2134 messageCount = 0;
2135 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2136 EXPECT_EQ(messageCount, 0);
2137 g_communicatorAggregator->RegOnDispatch(nullptr);
2138 }
2139 }