1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17
18 #include "distributeddb_data_generate_unit_test.h"
19 #include "distributeddb_tools_unit_test.h"
20 #include "kv_virtual_device.h"
21 #include "platform_specific.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "single_ver_data_packet.h"
24 #include "virtual_communicator_aggregator.h"
25
26 using namespace testing::ext;
27 using namespace DistributedDB;
28 using namespace DistributedDBUnitTest;
29 using namespace std;
30
31 namespace {
32 string g_testDir;
33 const string STORE_ID = "kv_stroe_sync_check_test";
34 const std::string DEVICE_B = "deviceB";
35 const std::string DEVICE_C = "deviceC";
36 const int LOCAL_WATER_MARK_NOT_INIT = 0xaa;
37 const int EIGHT_HUNDRED = 800;
38 const int NORMAL_SYNC_SEND_REQUEST_CNT = 3;
39 const int TWO_CNT = 2;
40 const int SLEEP_MILLISECONDS = 500;
41 const int TEN_SECONDS = 10;
42 const int THREE_HUNDRED = 300;
43 const int WAIT_30_SECONDS = 30000;
44 const int WAIT_40_SECONDS = 40000;
45 const int TIMEOUT_6_SECONDS = 6000;
46
47 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
48 KvStoreConfig g_config;
49 DistributedDBToolsUnitTest g_tool;
50 DBStatus g_kvDelegateStatus = INVALID_ARGS;
51 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
52 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
53 KvVirtualDevice* g_deviceB = nullptr;
54 KvVirtualDevice* g_deviceC = nullptr;
55 VirtualSingleVerSyncDBInterface *g_syncInterfaceB = nullptr;
56 VirtualSingleVerSyncDBInterface *g_syncInterfaceC = nullptr;
57
58 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
59 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
60 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
61 #ifndef LOW_LEVEL_MEM_DEV
62 const int KEY_LEN = 20; // 20 Bytes
63 const int VALUE_LEN = 4 * 1024 * 1024; // 4MB
64 const int ENTRY_NUM = 2; // 16 entries
65 #endif
66 }
67
68 class DistributedDBSingleVerP2PSyncCheckTest : public testing::Test {
69 public:
70 static void SetUpTestCase(void);
71 static void TearDownTestCase(void);
72 void SetUp();
73 void TearDown();
74 };
75
SetUpTestCase(void)76 void DistributedDBSingleVerP2PSyncCheckTest::SetUpTestCase(void)
77 {
78 /**
79 * @tc.setup: Init datadir and Virtual Communicator.
80 */
81 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
82 g_config.dataDir = g_testDir;
83 g_mgr.SetKvStoreConfig(g_config);
84
85 string dir = g_testDir + "/single_ver";
86 DIR* dirTmp = opendir(dir.c_str());
87 if (dirTmp == nullptr) {
88 OS::MakeDBDirectory(dir);
89 } else {
90 closedir(dirTmp);
91 }
92
93 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
94 ASSERT_TRUE(g_communicatorAggregator != nullptr);
95 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
96
97 std::shared_ptr<ProcessSystemApiAdapterImpl> g_adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
98 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(g_adapter);
99 }
100
TearDownTestCase(void)101 void DistributedDBSingleVerP2PSyncCheckTest::TearDownTestCase(void)
102 {
103 /**
104 * @tc.teardown: Release virtual Communicator and clear data dir.
105 */
106 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
107 LOGE("rm test db files error!");
108 }
109 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
110 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
111 }
112
SetUp(void)113 void DistributedDBSingleVerP2PSyncCheckTest::SetUp(void)
114 {
115 DistributedDBToolsUnitTest::PrintTestCaseInfo();
116 /**
117 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
118 */
119 KvStoreNbDelegate::Option option;
120 option.secOption.securityLabel = SecurityLabel::S3;
121 option.secOption.securityFlag = SecurityFlag::SECE;
122 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
123 ASSERT_TRUE(g_kvDelegateStatus == OK);
124 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
125 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
126 ASSERT_TRUE(g_deviceB != nullptr);
127 g_syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
128 ASSERT_TRUE(g_syncInterfaceB != nullptr);
129 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, g_syncInterfaceB), E_OK);
130
131 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
132 ASSERT_TRUE(g_deviceC != nullptr);
133 g_syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
134 ASSERT_TRUE(g_syncInterfaceC != nullptr);
135 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, g_syncInterfaceC), E_OK);
136 }
137
TearDown(void)138 void DistributedDBSingleVerP2PSyncCheckTest::TearDown(void)
139 {
140 /**
141 * @tc.teardown: Release device A, B, C
142 */
143 if (g_kvDelegatePtr != nullptr) {
144 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
145 g_kvDelegatePtr = nullptr;
146 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
147 LOGD("delete kv store status %d", status);
148 ASSERT_TRUE(status == OK);
149 }
150 if (g_deviceB != nullptr) {
151 delete g_deviceB;
152 g_deviceB = nullptr;
153 }
154 if (g_deviceC != nullptr) {
155 delete g_deviceC;
156 g_deviceC = nullptr;
157 }
158 if (g_communicatorAggregator != nullptr) {
159 g_communicatorAggregator->RegOnDispatch(nullptr);
160 }
161 }
162
163 /**
164 * @tc.name: sec option check Sync 001
165 * @tc.desc: if sec option not equal, forbid sync
166 * @tc.type: FUNC
167 * @tc.require: AR000EV1G6
168 * @tc.author: wangchuanqing
169 */
170 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck001, TestSize.Level1)
171 {
172 DBStatus status = OK;
173 std::vector<std::string> devices;
174 devices.push_back(g_deviceB->GetDeviceId());
175 devices.push_back(g_deviceC->GetDeviceId());
176
177 /**
178 * @tc.steps: step1. deviceA put {k1, v1}
179 */
180 Key key = {'1'};
181 Value value = {'1'};
182 status = g_kvDelegatePtr->Put(key, value);
183 ASSERT_TRUE(status == OK);
184
185 ASSERT_TRUE(g_syncInterfaceB != nullptr);
186 ASSERT_TRUE(g_syncInterfaceC != nullptr);
187 SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
188 g_syncInterfaceB->SetSecurityOption(secOption);
189 g_syncInterfaceC->SetSecurityOption(secOption);
190
191 /**
192 * @tc.steps: step2. deviceA call sync and wait
193 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
194 */
195 std::map<std::string, DBStatus> result;
196 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
197 ASSERT_TRUE(status == OK);
198
199 ASSERT_TRUE(result.size() == devices.size());
200 for (const auto &pair : result) {
201 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
202 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
203 }
204 VirtualDataItem item;
205 g_deviceB->GetData(key, item);
206 EXPECT_TRUE(item.value.empty());
207 g_deviceC->GetData(key, item);
208 EXPECT_TRUE(item.value.empty());
209 }
210
211 /**
212 * @tc.name: sec option check Sync 002
213 * @tc.desc: if sec option not equal, forbid sync
214 * @tc.type: FUNC
215 * @tc.require: AR000EV1G6
216 * @tc.author: wangchuanqing
217 */
218 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck002, TestSize.Level1)
219 {
220 DBStatus status = OK;
221 std::vector<std::string> devices;
222 devices.push_back(g_deviceB->GetDeviceId());
223 devices.push_back(g_deviceC->GetDeviceId());
224
225 /**
226 * @tc.steps: step1. deviceA put {k1, v1}
227 */
228 Key key = {'1'};
229 Value value = {'1'};
230 status = g_kvDelegatePtr->Put(key, value);
231 ASSERT_TRUE(status == OK);
232
233 ASSERT_TRUE(g_syncInterfaceC != nullptr);
234 SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
235 g_syncInterfaceC->SetSecurityOption(secOption);
236 secOption.securityLabel = SecurityLabel::S3;
237 secOption.securityFlag = SecurityFlag::SECE;
238 g_syncInterfaceB->SetSecurityOption(secOption);
239
240 /**
241 * @tc.steps: step2. deviceA call sync and wait
242 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
243 */
244 std::map<std::string, DBStatus> result;
245 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
246 ASSERT_TRUE(status == OK);
247
248 ASSERT_TRUE(result.size() == devices.size());
249 for (const auto &pair : result) {
250 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
251 if (pair.first == DEVICE_B) {
252 EXPECT_TRUE(pair.second == OK);
253 } else {
254 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
255 }
256 }
257 VirtualDataItem item;
258 g_deviceC->GetData(key, item);
259 EXPECT_TRUE(item.value.empty());
260 g_deviceB->GetData(key, item);
261 EXPECT_TRUE(item.value == value);
262 }
263
264 #ifndef LOW_LEVEL_MEM_DEV
265 /**
266 * @tc.name: BigDataSync001
267 * @tc.desc: big data sync push mode.
268 * @tc.type: FUNC
269 * @tc.require: AR000F3OOU
270 * @tc.author: wangchuanqing
271 */
272 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync001, TestSize.Level1)
273 {
274 DBStatus status = OK;
275 std::vector<std::string> devices;
276 devices.push_back(g_deviceB->GetDeviceId());
277 devices.push_back(g_deviceC->GetDeviceId());
278
279 /**
280 * @tc.steps: step1. deviceA put 16 bigData
281 */
282 std::vector<Entry> entries;
283 std::vector<Key> keys;
284 DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
285 for (const auto &entry : entries) {
286 status = g_kvDelegatePtr->Put(entry.key, entry.value);
287 ASSERT_TRUE(status == OK);
288 }
289
290 /**
291 * @tc.steps: step2. deviceA call sync and wait
292 * @tc.expected: step2. sync should return OK.
293 */
294 std::map<std::string, DBStatus> result;
295 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
296 ASSERT_TRUE(status == OK);
297
298 /**
299 * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
300 */
301 ASSERT_TRUE(result.size() == devices.size());
302 for (const auto &pair : result) {
303 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
304 EXPECT_TRUE(pair.second == OK);
305 }
306 VirtualDataItem item;
307 for (const auto &entry : entries) {
308 item.value.clear();
309 g_deviceB->GetData(entry.key, item);
310 EXPECT_TRUE(item.value == entry.value);
311 item.value.clear();
312 g_deviceC->GetData(entry.key, item);
313 EXPECT_TRUE(item.value == entry.value);
314 }
315 }
316
317 /**
318 * @tc.name: BigDataSync002
319 * @tc.desc: big data sync pull mode.
320 * @tc.type: FUNC
321 * @tc.require: AR000F3OOU
322 * @tc.author: wangchuanqing
323 */
324 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync002, TestSize.Level1)
325 {
326 DBStatus status = OK;
327 std::vector<std::string> devices;
328 devices.push_back(g_deviceB->GetDeviceId());
329 devices.push_back(g_deviceC->GetDeviceId());
330
331 /**
332 * @tc.steps: step1. deviceA deviceB put bigData
333 */
334 std::vector<Entry> entries;
335 std::vector<Key> keys;
336 DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
337
338 for (uint32_t i = 0; i < entries.size(); i++) {
339 if (i % 2 == 0) {
340 g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
341 } else {
342 g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
343 }
344 }
345
346 /**
347 * @tc.steps: step3. deviceA call pull sync
348 * @tc.expected: step3. sync should return OK.
349 */
350 std::map<std::string, DBStatus> result;
351 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
352 ASSERT_TRUE(status == OK);
353
354 /**
355 * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
356 */
357 ASSERT_TRUE(result.size() == devices.size());
358 for (const auto &pair : result) {
359 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
360 EXPECT_TRUE(pair.second == OK);
361 }
362 for (const auto &entry : entries) {
363 Value value;
364 EXPECT_EQ(g_kvDelegatePtr->Get(entry.key, value), OK);
365 EXPECT_EQ(value, entry.value);
366 }
367 }
368
369 /**
370 * @tc.name: BigDataSync003
371 * @tc.desc: big data sync pushAndPull mode.
372 * @tc.type: FUNC
373 * @tc.require: AR000F3OOV
374 * @tc.author: wangchuanqing
375 */
376 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync003, TestSize.Level1)
377 {
378 DBStatus status = OK;
379 std::vector<std::string> devices;
380 devices.push_back(g_deviceB->GetDeviceId());
381 devices.push_back(g_deviceC->GetDeviceId());
382
383 /**
384 * @tc.steps: step1. deviceA deviceB put bigData
385 */
386 std::vector<Entry> entries;
387 std::vector<Key> keys;
388 DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
389
390 for (uint32_t i = 0; i < entries.size(); i++) {
391 if (i % 3 == 0) { // 0 3 6 9 12 15 for deivec B
392 g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
393 } else if (i % 3 == 1) { // 1 4 7 10 13 16 for device C
394 g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
395 } else { // 2 5 8 11 14 for device A
396 status = g_kvDelegatePtr->Put(entries[i].key, entries[i].value);
397 ASSERT_TRUE(status == OK);
398 }
399 }
400
401 /**
402 * @tc.steps: step3. deviceA call pushAndpull sync
403 * @tc.expected: step3. sync should return OK.
404 */
405 std::map<std::string, DBStatus> result;
406 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
407 ASSERT_TRUE(status == OK);
408
409 /**
410 * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
411 * deviceB and deviceC has deviceA data
412 */
413 ASSERT_TRUE(result.size() == devices.size());
414 for (const auto &pair : result) {
415 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
416 EXPECT_TRUE(pair.second == OK);
417 }
418
419 VirtualDataItem item;
420 for (uint32_t i = 0; i < entries.size(); i++) {
421 Value value;
422 EXPECT_EQ(g_kvDelegatePtr->Get(entries[i].key, value), OK);
423 EXPECT_EQ(value, entries[i].value);
424
425 if (i % 3 == 2) { // 2 5 8 11 14 for device A
426 item.value.clear();
427 g_deviceB->GetData(entries[i].key, item);
428 EXPECT_TRUE(item.value == entries[i].value);
429 item.value.clear();
430 g_deviceC->GetData(entries[i].key, item);
431 EXPECT_TRUE(item.value == entries[i].value);
432 }
433 }
434 }
435 #endif
436
437 /**
438 * @tc.name: PushFinishedNotify 001
439 * @tc.desc: Test remote device push finished notify function.
440 * @tc.type: FUNC
441 * @tc.require: AR000CQS3S
442 * @tc.author: xushaohua
443 */
444 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, PushFinishedNotify001, TestSize.Level1)
445 {
446 std::vector<std::string> devices;
447 devices.push_back(g_deviceB->GetDeviceId());
448
449 /**
450 * @tc.steps: step1. deviceA call SetRemotePushFinishedNotify
451 * @tc.expected: step1. set should return OK.
452 */
453 int pushfinishedFlag = 0;
454 DBStatus status = g_kvDelegatePtr->SetRemotePushFinishedNotify(
__anon0f1c56880202(const RemotePushNotifyInfo &info) 455 [&pushfinishedFlag](const RemotePushNotifyInfo &info) {
456 EXPECT_TRUE(info.deviceId == DEVICE_B);
457 pushfinishedFlag = 1;
458 });
459 ASSERT_EQ(status, OK);
460
461 /**
462 * @tc.steps: step2. deviceB put k2, v2, and deviceA pull from deviceB
463 * @tc.expected: step2. deviceA can not receive push finished notify
464 */
465 EXPECT_EQ(g_kvDelegatePtr->Put(KEY_2, VALUE_2), OK);
466 std::map<std::string, DBStatus> result;
467 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
468 EXPECT_TRUE(status == OK);
469 EXPECT_EQ(pushfinishedFlag, 0);
470 pushfinishedFlag = 0;
471
472 /**
473 * @tc.steps: step3. deviceB put k3, v3, and deviceA push and pull to deviceB
474 * @tc.expected: step3. deviceA can not receive push finished notify
475 */
476 EXPECT_EQ(g_kvDelegatePtr->Put(KEY_3, VALUE_3), OK);
477 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
478 EXPECT_TRUE(status == OK);
479 EXPECT_EQ(pushfinishedFlag, 0);
480 pushfinishedFlag = 0;
481
482 /**
483 * @tc.steps: step4. deviceA call SetRemotePushFinishedNotify to reset notify
484 * @tc.expected: step4. set should return OK.
485 */
__anon0f1c56880302(const RemotePushNotifyInfo &info) 486 status = g_kvDelegatePtr->SetRemotePushFinishedNotify([&pushfinishedFlag](const RemotePushNotifyInfo &info) {
487 EXPECT_TRUE(info.deviceId == DEVICE_B);
488 pushfinishedFlag = 2;
489 });
490 ASSERT_EQ(status, OK);
491
492 /**
493 * @tc.steps: step5. deviceA call SetRemotePushFinishedNotify set null to unregist
494 * @tc.expected: step5. set should return OK.
495 */
496 status = g_kvDelegatePtr->SetRemotePushFinishedNotify(nullptr);
497 ASSERT_EQ(status, OK);
498 }
499
500 namespace {
RegOnDispatchWithDelayAck(bool & errCodeAck,bool & afterErrAck)501 void RegOnDispatchWithDelayAck(bool &errCodeAck, bool &afterErrAck)
502 {
503 // just delay the busy ack
504 g_communicatorAggregator->RegOnDispatch([&errCodeAck, &afterErrAck](const std::string &dev, Message *inMsg) {
505 if (dev != g_deviceB->GetDeviceId()) {
506 return;
507 }
508 auto *packet = inMsg->GetObject<DataAckPacket>();
509 if (packet->GetRecvCode() == -E_BUSY) {
510 errCodeAck = true;
511 while (!afterErrAck) {
512 }
513 LOGW("NOW SEND BUSY ACK");
514 } else if (errCodeAck) {
515 afterErrAck = true;
516 std::this_thread::sleep_for(std::chrono::seconds(1));
517 }
518 });
519 }
520
RegOnDispatchWithOffline(bool & offlineFlag,bool & invalid,condition_variable & conditionOffline)521 void RegOnDispatchWithOffline(bool &offlineFlag, bool &invalid, condition_variable &conditionOffline)
522 {
523 g_communicatorAggregator->RegOnDispatch([&offlineFlag, &invalid, &conditionOffline](
524 const std::string &dev, Message *inMsg) {
525 auto *packet = inMsg->GetObject<DataAckPacket>();
526 if (dev != DEVICE_B) {
527 if (packet->GetRecvCode() == LOCAL_WATER_MARK_NOT_INIT) {
528 offlineFlag = true;
529 conditionOffline.notify_all();
530 LOGW("[Dispatch] NOTIFY OFFLINE");
531 std::this_thread::sleep_for(std::chrono::microseconds(EIGHT_HUNDRED));
532 }
533 } else if (!invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
534 LOGW("[Dispatch] NOW INVALID THIS MSG");
535 inMsg->SetMessageType(TYPE_INVALID);
536 inMsg->SetMessageId(INVALID_MESSAGE_ID);
537 invalid = true;
538 }
539 });
540 }
541
RegOnDispatchWithInvalidMsg(bool & invalid)542 void RegOnDispatchWithInvalidMsg(bool &invalid)
543 {
544 g_communicatorAggregator->RegOnDispatch([&invalid](
545 const std::string &dev, Message *inMsg) {
546 if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
547 LOGW("[Dispatch] NOW INVALID THIS MSG");
548 inMsg->SetMessageType(TYPE_INVALID);
549 inMsg->SetMessageId(INVALID_MESSAGE_ID);
550 invalid = true;
551 }
552 });
553 }
554
PrepareEnv(vector<std::string> & devices,Key & key,Query & query)555 void PrepareEnv(vector<std::string> &devices, Key &key, Query &query)
556 {
557 /**
558 * @tc.steps: step1. ensure the watermark is no zero and finish timeSync and abilitySync
559 * @tc.expected: step1. should return OK.
560 */
561 Value value = {'1'};
562 std::map<std::string, DBStatus> result;
563 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
564
565 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
566 EXPECT_TRUE(status == OK);
567 ASSERT_TRUE(result[g_deviceB->GetDeviceId()] == OK);
568 }
569
Sync(vector<std::string> & devices,const DBStatus & targetStatus)570 void Sync(vector<std::string> &devices, const DBStatus &targetStatus)
571 {
572 std::map<std::string, DBStatus> result;
573 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result);
574 EXPECT_TRUE(status == OK);
575 for (const auto &deviceId : devices) {
576 ASSERT_TRUE(result[deviceId] == targetStatus);
577 }
578 }
579
SyncWithQuery(vector<std::string> & devices,const Query & query,const SyncMode & mode,const DBStatus & targetStatus)580 void SyncWithQuery(vector<std::string> &devices, const Query &query, const SyncMode &mode,
581 const DBStatus &targetStatus)
582 {
583 std::map<std::string, DBStatus> result;
584 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result, query);
585 EXPECT_TRUE(status == OK);
586 for (const auto &deviceId : devices) {
587 ASSERT_TRUE(result[deviceId] == targetStatus);
588 }
589 }
590
SyncWithQuery(vector<std::string> & devices,const Query & query,const DBStatus & targetStatus)591 void SyncWithQuery(vector<std::string> &devices, const Query &query, const DBStatus &targetStatus)
592 {
593 SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PUSH_ONLY, targetStatus);
594 }
595
SyncWithDeviceOffline(vector<std::string> & devices,Key & key,const Query & query)596 void SyncWithDeviceOffline(vector<std::string> &devices, Key &key, const Query &query)
597 {
598 Value value = {'2'};
599 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
600
601 /**
602 * @tc.steps: step2. invalid the sync msg
603 * @tc.expected: step2. should return TIME_OUT.
604 */
605 SyncWithQuery(devices, query, TIME_OUT);
606
607 /**
608 * @tc.steps: step3. device offline when sync
609 * @tc.expected: step3. should return COMM_FAILURE.
610 */
611 SyncWithQuery(devices, query, COMM_FAILURE);
612 }
613
PrepareWaterMarkError(std::vector<std::string> & devices,Query & query)614 void PrepareWaterMarkError(std::vector<std::string> &devices, Query &query)
615 {
616 /**
617 * @tc.steps: step1. prepare data
618 */
619 devices.push_back(g_deviceB->GetDeviceId());
620 g_deviceB->Online();
621
622 Key key = {'1'};
623 query = Query::Select().PrefixKey(key);
624 PrepareEnv(devices, key, query);
625
626 /**
627 * @tc.steps: step2. query sync and set queryWaterMark
628 * @tc.expected: step2. should return OK.
629 */
630 Value value = {'2'};
631 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
632 SyncWithQuery(devices, query, OK);
633
634 /**
635 * @tc.steps: step3. sync and invalid msg for set local device waterMark
636 * @tc.expected: step3. should return TIME_OUT.
637 */
638 bool invalidMsg = false;
639 RegOnDispatchWithInvalidMsg(invalidMsg);
640 value = {'3'};
641 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
642 Sync(devices, TIME_OUT);
643 g_communicatorAggregator->RegOnDispatch(nullptr);
644 }
645 }
646
647 /**
648 * @tc.name: AckSessionCheck 001
649 * @tc.desc: Test ack session check function.
650 * @tc.type: FUNC
651 * @tc.require: AR000F3OOV
652 * @tc.author: zhangqiquan
653 */
654 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSessionCheck001, TestSize.Level3)
655 {
656 std::vector<std::string> devices;
657 devices.push_back(g_deviceB->GetDeviceId());
658
659 /**
660 * @tc.steps: step1. deviceB sync to deviceA just for timeSync and abilitySync
661 * @tc.expected: step1. should return OK.
662 */
663 ASSERT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
664
665 /**
666 * @tc.steps: step2. deviceA StartTransaction for prevent other sync action deviceB sync will fail
667 * @tc.expected: step2. should return OK.
668 */
669 ASSERT_TRUE(g_kvDelegatePtr->StartTransaction() == OK);
670
671 bool errCodeAck = false;
672 bool afterErrAck = false;
673 RegOnDispatchWithDelayAck(errCodeAck, afterErrAck);
674
675 Key key = {'1'};
676 Value value = {'1'};
677 Timestamp currentTime;
678 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
679 EXPECT_TRUE(g_deviceB->PutData(key, value, currentTime, 0) == OK);
680 EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
681
682 Value outValue;
683 EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == NOT_FOUND);
684
685 /**
686 * @tc.steps: step3. release the writeHandle and try again, sync success
687 * @tc.expected: step3. should return OK.
688 */
689 EXPECT_TRUE(g_kvDelegatePtr->Commit() == OK);
690 EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
691
692 EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == E_OK);
693 EXPECT_EQ(outValue, value);
694 }
695
696 /**
697 * @tc.name: AckSafeCheck001
698 * @tc.desc: Test ack session check filter all bad ack in device offline scene.
699 * @tc.type: FUNC
700 * @tc.require: AR000F3OOV
701 * @tc.author: zhangqiquan
702 */
703 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSafeCheck001, TestSize.Level3)
704 {
705 std::vector<std::string> devices;
706 devices.push_back(g_deviceB->GetDeviceId());
707 g_deviceB->Online();
708
709 Key key = {'1'};
710 Query query = Query::Select().PrefixKey(key);
711 PrepareEnv(devices, key, query);
712
713 std::condition_variable conditionOnline;
714 std::condition_variable conditionOffline;
715 bool onlineFlag = false;
716 bool invalid = false;
717 bool offlineFlag = false;
__anon0f1c56880802() 718 thread subThread([&onlineFlag, &conditionOnline, &offlineFlag, &conditionOffline]() {
719 LOGW("[Dispatch] NOW DEVICES IS OFFLINE");
720 std::mutex offlineMtx;
721 std::unique_lock<std::mutex> lck(offlineMtx);
722 conditionOffline.wait(lck, [&offlineFlag]{ return offlineFlag; });
723 g_deviceB->Offline();
724 std::this_thread::sleep_for(std::chrono::milliseconds(100));
725 g_deviceB->Online();
726 onlineFlag = true;
727 conditionOnline.notify_all();
728 LOGW("[Dispatch] NOW DEVICES IS ONLINE");
729 });
730 subThread.detach();
731
732 RegOnDispatchWithOffline(offlineFlag, invalid, conditionOffline);
733
734 SyncWithDeviceOffline(devices, key, query);
735
736 std::mutex onlineMtx;
737 std::unique_lock<std::mutex> lck(onlineMtx);
__anon0f1c56880a02null738 conditionOnline.wait(lck, [&onlineFlag]{ return onlineFlag; });
739
740 /**
741 * @tc.steps: step4. sync again if has problem it will sync never end
742 * @tc.expected: step4. should return OK.
743 */
744 SyncWithQuery(devices, query, OK);
745 }
746
747 /**
748 * @tc.name: WaterMarkCheck001
749 * @tc.desc: Test waterMark work correct in lost package scene.
750 * @tc.type: FUNC
751 * @tc.require: AR000F3OOV
752 * @tc.author: zhangqiquan
753 */
754 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck001, TestSize.Level1)
755 {
756 std::vector<std::string> devices;
757 Query query = Query::Select();
758 PrepareWaterMarkError(devices, query);
759
760 /**
761 * @tc.steps: step4. sync again see it work correct
762 * @tc.expected: step4. should return OK.
763 */
764 SyncWithQuery(devices, query, OK);
765 }
766
767 /**
768 * @tc.name: WaterMarkCheck002
769 * @tc.desc: Test pull work correct in error waterMark scene.
770 * @tc.type: FUNC
771 * @tc.require: AR000F3OOV
772 * @tc.author: zhangqiquan
773 */
774 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck002, TestSize.Level1)
775 {
776 std::vector<std::string> devices;
777 Query query = Query::Select();
778 PrepareWaterMarkError(devices, query);
779
780 /**
781 * @tc.steps: step4. sync again see it work correct
782 * @tc.expected: step4. should return OK.
783 */
784 Key key = {'2'};
785 ASSERT_TRUE(g_kvDelegatePtr->Put(key, {}) == OK);
786 query = Query::Select();
787 SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PULL_ONLY, OK);
788
789 VirtualDataItem item;
790 EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
791 }
792
RegOnDispatchToGetSyncCount(int & sendRequestCount,int sleepMs=0)793 void RegOnDispatchToGetSyncCount(int &sendRequestCount, int sleepMs = 0)
794 {
795 g_communicatorAggregator->RegOnDispatch([sleepMs, &sendRequestCount](
796 const std::string &dev, Message *inMsg) {
797 if (dev == DEVICE_B && inMsg->GetMessageType() == TYPE_REQUEST) {
798 std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
799 sendRequestCount++;
800 LOGD("sendRequestCount++...");
801 }
802 });
803 }
804
TestDifferentSyncMode(SyncMode mode)805 void TestDifferentSyncMode(SyncMode mode)
806 {
807 std::vector<std::string> devices;
808 devices.push_back(g_deviceB->GetDeviceId());
809
810 /**
811 * @tc.steps: step1. deviceA put {k1, v1}
812 */
813 Key key = {'1'};
814 Value value = {'1'};
815 DBStatus status = g_kvDelegatePtr->Put(key, value);
816 ASSERT_TRUE(status == OK);
817
818 int sendRequestCount = 0;
819 RegOnDispatchToGetSyncCount(sendRequestCount);
820
821 /**
822 * @tc.steps: step2. deviceA call sync and wait
823 * @tc.expected: step2. sync should return OK.
824 */
825 std::map<std::string, DBStatus> result;
826 status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result);
827 ASSERT_TRUE(status == OK);
828
829 /**
830 * @tc.expected: step2. onComplete should be called, DeviceB have {k1,v1}, send request message 3 times
831 */
832 ASSERT_TRUE(result.size() == devices.size());
833 for (const auto &pair : result) {
834 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
835 EXPECT_TRUE(pair.second == OK);
836 }
837 VirtualDataItem item;
838 g_deviceB->GetData(key, item);
839 EXPECT_TRUE(item.value == value);
840
841 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
842
843 /**
844 * @tc.steps: step3. reset sendRequestCount to 0, deviceA call sync and wait again without any change in db
845 * @tc.expected: step3. sync should return OK, and sendRequestCount should be 1, because this merge can not
846 * be skipped
847 */
848 sendRequestCount = 0;
849 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
850 ASSERT_TRUE(status == OK);
851 EXPECT_EQ(sendRequestCount, 1);
852 }
853
854 /**
855 * @tc.name: PushSyncMergeCheck001
856 * @tc.desc: Test push sync task merge, task can not be merged when the two sync task is not in the queue
857 * at the same time.
858 * @tc.type: FUNC
859 * @tc.require: AR000F3OOV
860 * @tc.author: zhangshijie
861 */
862 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck001, TestSize.Level1)
863 {
864 TestDifferentSyncMode(SYNC_MODE_PUSH_ONLY);
865 }
866
867 /**
868 * @tc.name: PushSyncMergeCheck002
869 * @tc.desc: Test push_pull sync task merge, task can not be merged when the two sync task is not in the queue
870 * at the same time.
871 * @tc.type: FUNC
872 * @tc.require: AR000F3OOV
873 * @tc.author: zhangshijie
874 */
875 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck002, TestSize.Level1)
876 {
877 TestDifferentSyncMode(SYNC_MODE_PUSH_PULL);
878 }
879
PrepareForSyncMergeTest(std::vector<std::string> & devices,int & sendRequestCount)880 void PrepareForSyncMergeTest(std::vector<std::string> &devices, int &sendRequestCount)
881 {
882 /**
883 * @tc.steps: step1. deviceA put {k1, v1}
884 */
885 Key key = {'1'};
886 Value value = {'1'};
887 DBStatus status = g_kvDelegatePtr->Put(key, value);
888 ASSERT_TRUE(status == OK);
889
890 RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
891
892 /**
893 * @tc.steps: step2. deviceA call sync and don't wait
894 * @tc.expected: step2. sync should return OK.
895 */
896 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
897 [&sendRequestCount, devices, key, value](const std::map<std::string, DBStatus>& statusMap) {
898 ASSERT_TRUE(statusMap.size() == devices.size());
899 for (const auto &pair : statusMap) {
900 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
901 EXPECT_TRUE(pair.second == OK);
902 }
903 VirtualDataItem item;
904 g_deviceB->GetData(key, item);
905 EXPECT_EQ(item.value, value);
906 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
907
908 // reset sendRequestCount to 0
909 sendRequestCount = 0;
910 });
911 ASSERT_TRUE(status == OK);
912 }
913
914 /**
915 * @tc.name: PushSyncMergeCheck003
916 * @tc.desc: Test push sync task merge, task can not be merged when there is change in db since last push sync
917 * @tc.type: FUNC
918 * @tc.require: AR000F3OOV
919 * @tc.author: zhangshijie
920 */
921 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck003, TestSize.Level3)
922 {
923 DBStatus status = OK;
924 std::vector<std::string> devices;
925 devices.push_back(g_deviceB->GetDeviceId());
926
927 int sendRequestCount = 0;
928 PrepareForSyncMergeTest(devices, sendRequestCount);
929
930 /**
931 * @tc.steps: step3. deviceA call sync and don't wait
932 * @tc.expected: step3. sync should return OK.
933 */
934 Key key = {'1'};
935 Value value = {'2'};
936 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56880d02(const std::map<std::string, DBStatus>& statusMap) 937 [&sendRequestCount, devices, key, value, this](const std::map<std::string, DBStatus>& statusMap) {
938 /**
939 * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
940 * skipped, but it is no need to do time sync and ability sync, only need to do data sync
941 */
942 ASSERT_TRUE(statusMap.size() == devices.size());
943 for (const auto &pair : statusMap) {
944 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
945 EXPECT_TRUE(pair.second == OK);
946 }
947 VirtualDataItem item;
948 g_deviceB->GetData(key, item);
949 EXPECT_EQ(item.value, value);
950 });
951 ASSERT_TRUE(status == OK);
952
953 /**
954 * @tc.steps: step4. deviceA put {k1, v2}
955 */
956 while (sendRequestCount < TWO_CNT) {
957 std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
958 }
959 status = g_kvDelegatePtr->Put(key, value);
960 ASSERT_TRUE(status == OK);
961 // wait for the second sync task finish
962 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
963 EXPECT_EQ(sendRequestCount, 1);
964 }
965
966 /**
967 * @tc.name: PushSyncMergeCheck004
968 * @tc.desc: Test push sync task merge, task can be merged when there is no change in db since last push sync
969 * @tc.type: FUNC
970 * @tc.require: AR000F3OOV
971 * @tc.author: zhangshijie
972 */
973 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck004, TestSize.Level3)
974 {
975 DBStatus status = OK;
976 std::vector<std::string> devices;
977 devices.push_back(g_deviceB->GetDeviceId());
978
979 int sendRequestCount = 0;
980 PrepareForSyncMergeTest(devices, sendRequestCount);
981
982 /**
983 * @tc.steps: step3. deviceA call sync and don't wait
984 * @tc.expected: step3. sync should return OK.
985 */
986 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56880e02(const std::map<std::string, DBStatus>& statusMap) 987 [devices, this](const std::map<std::string, DBStatus>& statusMap) {
988 /**
989 * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can be
990 * skipped
991 */
992 ASSERT_TRUE(statusMap.size() == devices.size());
993 for (const auto &pair : statusMap) {
994 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
995 EXPECT_TRUE(pair.second == OK);
996 }
997 });
998 ASSERT_TRUE(status == OK);
999 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1000 EXPECT_EQ(sendRequestCount, 0);
1001 }
1002
RegOnDispatchWithInvalidMsgAndCnt(int & sendRequestCount,int sleepMs,bool & invalid)1003 void RegOnDispatchWithInvalidMsgAndCnt(int &sendRequestCount, int sleepMs, bool &invalid)
1004 {
1005 g_communicatorAggregator->RegOnDispatch([&sendRequestCount, sleepMs, &invalid](
1006 const std::string &dev, Message *inMsg) {
1007 if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
1008 inMsg->SetMessageType(TYPE_INVALID);
1009 inMsg->SetMessageId(INVALID_MESSAGE_ID);
1010 sendRequestCount++;
1011 invalid = true;
1012 LOGW("[Dispatch]invalid THIS MSG, sendRequestCount = %d", sendRequestCount);
1013 std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1014 }
1015 });
1016 }
1017
1018 /**
1019 * @tc.name: PushSyncMergeCheck005
1020 * @tc.desc: Test push sync task merge, task cannot be merged when the last push sync is failed
1021 * @tc.type: FUNC
1022 * @tc.require: AR000F3OOV
1023 * @tc.author: zhangshijie
1024 */
1025 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck005, TestSize.Level3)
1026 {
1027 DBStatus status = OK;
1028 std::vector<std::string> devices;
1029 devices.push_back(g_deviceB->GetDeviceId());
1030
1031 /**
1032 * @tc.steps: step1. deviceA put {k1, v1}
1033 */
1034 Key key = {'1'};
1035 Value value = {'1'};
1036 status = g_kvDelegatePtr->Put(key, value);
1037 ASSERT_TRUE(status == OK);
1038
1039 int sendRequestCount = 0;
1040 bool invalid = false;
1041 RegOnDispatchWithInvalidMsgAndCnt(sendRequestCount, SLEEP_MILLISECONDS, invalid);
1042
1043 /**
1044 * @tc.steps: step2. deviceA call sync and don't wait
1045 * @tc.expected: step2. sync should return TIME_OUT.
1046 */
1047 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56881002(const std::map<std::string, DBStatus>& statusMap) 1048 [&sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1049 ASSERT_TRUE(statusMap.size() == devices.size());
1050 for (const auto &deviceId : devices) {
1051 ASSERT_EQ(statusMap.at(deviceId), TIME_OUT);
1052 }
1053 });
1054 EXPECT_TRUE(status == OK);
1055
1056 /**
1057 * @tc.steps: step3. deviceA call sync and don't wait
1058 * @tc.expected: step3. sync should return OK.
1059 */
1060 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56881102(const std::map<std::string, DBStatus>& statusMap) 1061 [key, value, &sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1062 /**
1063 * @tc.expected: when the second sync task return, sendRequestCount should be 3, because this merge can not be
1064 * skipped, deviceB should have {k1, v1}.
1065 */
1066 ASSERT_TRUE(statusMap.size() == devices.size());
1067 for (const auto &pair : statusMap) {
1068 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1069 EXPECT_EQ(pair.second, OK);
1070 }
1071 VirtualDataItem item;
1072 g_deviceB->GetData(key, item);
1073 EXPECT_EQ(item.value, value);
1074 });
1075 ASSERT_TRUE(status == OK);
1076 while (sendRequestCount < 1) {
1077 std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1078 }
1079 sendRequestCount = 0;
1080 RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1081
1082 // wait for the second sync task finish
1083 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1084 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1085 }
1086
PrePareForQuerySyncMergeTest(bool isQuerySync,std::vector<std::string> & devices,Key & key,Value & value,int & sendRequestCount)1087 void PrePareForQuerySyncMergeTest(bool isQuerySync, std::vector<std::string> &devices,
1088 Key &key, Value &value, int &sendRequestCount)
1089 {
1090 DBStatus status = OK;
1091 /**
1092 * @tc.steps: step1. deviceA put {k1, v1}...{k10, v10}
1093 */
1094 Query query = Query::Select().PrefixKey(key);
1095 const int dataSize = 10;
1096 for (int i = 0; i < dataSize; i++) {
1097 key.push_back(i);
1098 value.push_back(i);
1099 status = g_kvDelegatePtr->Put(key, value);
1100 ASSERT_TRUE(status == OK);
1101 key.pop_back();
1102 value.pop_back();
1103 }
1104
1105 RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1106 /**
1107 * @tc.steps: step2. deviceA call query sync and don't wait
1108 * @tc.expected: step2. sync should return OK.
1109 */
1110 auto completeCallBack = [&sendRequestCount, &key, &value, dataSize, devices]
1111 (const std::map<std::string, DBStatus>& statusMap) {
1112 ASSERT_TRUE(statusMap.size() == devices.size());
1113 for (const auto &pair : statusMap) {
1114 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1115 EXPECT_EQ(pair.second, OK);
1116 }
1117 // when first sync finish, DeviceB have {k1,v1}, {k3,v3}, {k5,v5} .. send request message 3 times
1118 VirtualDataItem item;
1119 for (int i = 0; i < dataSize; i++) {
1120 key.push_back(i);
1121 value.push_back(i);
1122 g_deviceB->GetData(key, item);
1123 EXPECT_EQ(item.value, value);
1124 key.pop_back();
1125 value.pop_back();
1126 }
1127 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1128 // reset sendRequestCount to 0
1129 sendRequestCount = 0;
1130 };
1131 if (isQuerySync) {
1132 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack, query, false);
1133 } else {
1134 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack);
1135 }
1136 ASSERT_TRUE(status == OK);
1137 }
1138
1139 /**
1140 * @tc.name: QuerySyncMergeCheck001
1141 * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last query sync
1142 * @tc.type: FUNC
1143 * @tc.require: AR000F3OOV
1144 * @tc.author: zhangshijie
1145 */
1146 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck001, TestSize.Level3)
1147 {
1148 std::vector<std::string> devices;
1149 int sendRequestCount = 0;
1150 devices.push_back(g_deviceB->GetDeviceId());
1151
1152 Key key {'1'};
1153 Value value {'1'};
1154 Query query = Query::Select().PrefixKey(key);
1155 PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1156
1157 /**
1158 * @tc.steps: step3. deviceA call query sync and don't wait
1159 * @tc.expected: step3. sync should return OK.
1160 */
1161 DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56881302(const std::map<std::string, DBStatus>& statusMap) 1162 [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1163 /**
1164 * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can be
1165 * skipped because there is no change in db since last query sync
1166 */
1167 ASSERT_TRUE(statusMap.size() == devices.size());
1168 for (const auto &pair : statusMap) {
1169 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1170 EXPECT_TRUE(pair.second == OK);
1171 }
1172 }, query, false);
1173 ASSERT_TRUE(status == OK);
1174 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1175 EXPECT_EQ(sendRequestCount, 0);
1176 }
1177
1178 /**
1179 * @tc.name: QuerySyncMergeCheck002
1180 * @tc.desc: Test query push sync task merge, task can not be merged when there is change in db since last sync
1181 * @tc.type: FUNC
1182 * @tc.require: AR000F3OOV
1183 * @tc.author: zhangshijie
1184 */
1185 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck002, TestSize.Level3)
1186 {
1187 std::vector<std::string> devices;
1188 int sendRequestCount = 0;
1189 devices.push_back(g_deviceB->GetDeviceId());
1190
1191 Key key {'1'};
1192 Value value {'1'};
1193 Query query = Query::Select().PrefixKey(key);
1194 PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1195
1196 /**
1197 * @tc.steps: step3. deviceA call query sync and don't wait
1198 * @tc.expected: step3. sync should return OK.
1199 */
1200 Value value3{'3'};
1201 DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56881402(const std::map<std::string, DBStatus>& statusMap) 1202 [&sendRequestCount, devices, key, value3, this](const std::map<std::string, DBStatus>& statusMap) {
1203 /**
1204 * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1205 * skipped when there is change in db since last query sync, deviceB have {k1, v1'}
1206 */
1207 ASSERT_TRUE(statusMap.size() == devices.size());
1208 for (const auto &pair : statusMap) {
1209 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1210 EXPECT_TRUE(pair.second == OK);
1211 }
1212 VirtualDataItem item;
1213 g_deviceB->GetData(key, item);
1214 EXPECT_TRUE(item.value == value3);
1215 EXPECT_EQ(sendRequestCount, 1);
1216 }, query, false);
1217 ASSERT_TRUE(status == OK);
1218
1219 /**
1220 * @tc.steps: step4. deviceA put {k1, v1'}
1221 * @tc.steps: step4. reset sendRequestCount to 0, deviceA call sync and wait
1222 * @tc.expected: step4. sync should return OK, and sendRequestCount should be 1, because this merge can not
1223 * be skipped
1224 */
1225 while (sendRequestCount < TWO_CNT) {
1226 std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1227 }
1228 g_kvDelegatePtr->Put(key, value3);
1229 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1230 }
1231
1232 /**
1233 * @tc.name: QuerySyncMergeCheck003
1234 * @tc.desc: Test query push sync task merge, task can not be merged when then query id is different
1235 * @tc.type: FUNC
1236 * @tc.require: AR000F3OOV
1237 * @tc.author: zhangshijie
1238 */
1239 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck003, TestSize.Level3)
1240 {
1241 std::vector<std::string> devices;
1242 int sendRequestCount = 0;
1243 devices.push_back(g_deviceB->GetDeviceId());
1244
1245 Key key {'1'};
1246 Value value {'1'};
1247 PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1248
1249 /**
1250 * @tc.steps: step3. deviceA call another query sync
1251 * @tc.expected: step3. sync should return OK.
1252 */
1253 Key key2 = {'2'};
1254 Value value2 = {'2'};
1255 DBStatus status = g_kvDelegatePtr->Put(key2, value2);
1256 ASSERT_TRUE(status == OK);
1257 Query query2 = Query::Select().PrefixKey(key2);
1258 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56881502(const std::map<std::string, DBStatus>& statusMap) 1259 [&sendRequestCount, key2, value2, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1260 /**
1261 * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1262 * skipped, deviceB have {k2,v2}
1263 */
1264 ASSERT_TRUE(statusMap.size() == devices.size());
1265 for (const auto &pair : statusMap) {
1266 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1267 EXPECT_TRUE(pair.second == OK);
1268 }
1269 VirtualDataItem item;
1270 g_deviceB->GetData(key2, item);
1271 EXPECT_TRUE(item.value == value2);
1272 EXPECT_EQ(sendRequestCount, 1);
1273 }, query2, false);
1274 ASSERT_TRUE(status == OK);
1275 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1276 }
1277
1278 /**
1279 * @tc.name: QuerySyncMergeCheck004
1280 * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last push sync
1281 * @tc.type: FUNC
1282 * @tc.require: AR000F3OOV
1283 * @tc.author: zhangshijie
1284 */
1285 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck004, TestSize.Level3)
1286 {
1287 DBStatus status = OK;
1288 std::vector<std::string> devices;
1289 devices.push_back(g_deviceB->GetDeviceId());
1290
1291 Key key {'1'};
1292 Value value {'1'};
1293 int sendRequestCount = 0;
1294 PrePareForQuerySyncMergeTest(false, devices, key, value, sendRequestCount);
1295
1296 /**
1297 * @tc.steps: step3. deviceA call query sync without any change in db
1298 * @tc.expected: step3. sync should return OK, and sendRequestCount should be 0, because this merge can be skipped
1299 */
1300 Query query = Query::Select().PrefixKey(key);
1301 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon0f1c56881602(const std::map<std::string, DBStatus>& statusMap) 1302 [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1303 /**
1304 * @tc.expected step3: when the second sync task return, sendRequestCount should be 0, because this merge
1305 * can be skipped because there is no change in db since last push sync
1306 */
1307 ASSERT_TRUE(statusMap.size() == devices.size());
1308 for (const auto &pair : statusMap) {
1309 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1310 EXPECT_TRUE(pair.second == OK);
1311 }
1312 }, query, false);
1313 ASSERT_TRUE(status == OK);
1314 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1315 EXPECT_EQ(sendRequestCount, 0);
1316 }
1317
1318 /**
1319 * @tc.name: GetDataNotify001
1320 * @tc.desc: Test GetDataNotify function, delay < 30s should sync ok, > 36 should timeout
1321 * @tc.type: FUNC
1322 * @tc.require: AR000D4876
1323 * @tc.author: zhangqiquan
1324 */
1325 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify001, TestSize.Level3)
1326 {
1327 ASSERT_NE(g_kvDelegatePtr, nullptr);
1328 DBStatus status = OK;
1329 std::vector<std::string> devices;
1330 devices.push_back(g_deviceB->GetDeviceId());
1331 const std::string DEVICE_A = "real_device";
1332 /**
1333 * @tc.steps: step1. deviceB set get data delay 40s
1334 */
1335 g_deviceB->DelayGetSyncData(WAIT_40_SECONDS);
1336 g_communicatorAggregator->SetTimeout(DEVICE_A, TIMEOUT_6_SECONDS);
1337
1338 /**
1339 * @tc.steps: step2. deviceA call sync and wait
1340 * @tc.expected: step2. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
1341 */
1342 std::map<std::string, DBStatus> result;
1343 std::map<std::string, int> virtualRes;
1344 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1345 EXPECT_EQ(status, OK);
1346 EXPECT_EQ(result.size(), devices.size());
1347 EXPECT_TRUE(result[DEVICE_B] == TIME_OUT || result[DEVICE_B] == OK);
1348 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1349 Query query = Query::Select();
__anon0f1c56881702(std::map<std::string, int> resMap) 1350 g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1351 virtualRes = std::move(resMap);
1352 }, true);
1353 EXPECT_EQ(virtualRes.size(), devices.size());
1354 EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_TIMEOUT));
1355 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1356
1357 /**
1358 * @tc.steps: step3. deviceB set get data delay 30s
1359 */
1360 g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1361 /**
1362 * @tc.steps: step4. deviceA call sync and wait
1363 * @tc.expected: step4. sync should return OK. onComplete should be called, deviceB sync OK.
1364 */
1365 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1366 EXPECT_EQ(status, OK);
1367 EXPECT_EQ(result.size(), devices.size());
1368 EXPECT_EQ(result[DEVICE_B], OK);
1369 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
__anon0f1c56881802(std::map<std::string, int> resMap) 1370 g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1371 virtualRes = std::move(resMap);
1372 }, true);
1373 EXPECT_EQ(virtualRes.size(), devices.size());
1374 EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_FINISHED_ALL));
1375 g_deviceB->DelayGetSyncData(0);
1376 }