1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <condition_variable>
17 #include <gtest/gtest.h>
18 #include <thread>
19
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "mock_sync_task_context.h"
27 #include "platform_specific.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_kv_sync_task_context.h"
30
31 using namespace testing::ext;
32 using namespace DistributedDB;
33 using namespace DistributedDBUnitTest;
34 using namespace std;
35
36 namespace {
37 class TestSingleVerKvSyncTaskContext : public SingleVerKvSyncTaskContext {
38 public:
39 TestSingleVerKvSyncTaskContext() = default;
40 };
41 string g_testDir;
42 const string STORE_ID = "kv_stroe_complex_sync_test";
43 const int WAIT_TIME = 1000;
44 const std::string DEVICE_A = "real_device";
45 const std::string DEVICE_B = "deviceB";
46 const std::string DEVICE_C = "deviceC";
47 const std::string CREATE_SYNC_TABLE_SQL =
48 "CREATE TABLE IF NOT EXISTS sync_data(" \
49 "key BLOB NOT NULL," \
50 "value BLOB," \
51 "timestamp INT NOT NULL," \
52 "flag INT NOT NULL," \
53 "device BLOB," \
54 "ori_device BLOB," \
55 "hash_key BLOB PRIMARY KEY NOT NULL," \
56 "w_timestamp INT," \
57 "modify_time INT," \
58 "create_time INT" \
59 ");";
60
61 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
62 KvStoreConfig g_config;
63 DistributedDBToolsUnitTest g_tool;
64 DBStatus g_kvDelegateStatus = INVALID_ARGS;
65 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
66 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
67 KvVirtualDevice *g_deviceB = nullptr;
68 KvVirtualDevice *g_deviceC = nullptr;
69
70 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
71 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
72 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
73
PullSyncTest()74 void PullSyncTest()
75 {
76 DBStatus status = OK;
77 std::vector<std::string> devices;
78 devices.push_back(g_deviceB->GetDeviceId());
79
80 Key key = {'1'};
81 Key key2 = {'2'};
82 Value value = {'1'};
83 g_deviceB->PutData(key, value, 0, 0);
84 g_deviceB->PutData(key2, value, 1, 0);
85
86 std::map<std::string, DBStatus> result;
87 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
88 ASSERT_TRUE(status == OK);
89
90 ASSERT_TRUE(result.size() == devices.size());
91 for (const auto &pair : result) {
92 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
93 EXPECT_TRUE(pair.second == OK);
94 }
95 Value value3;
96 EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
97 EXPECT_EQ(value3, value);
98 EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
99 EXPECT_EQ(value3, value);
100 }
101
CrudTest()102 void CrudTest()
103 {
104 vector<Entry> entries;
105 int totalSize = 10;
106 for (int i = 0; i < totalSize; i++) {
107 Entry entry;
108 entry.key.push_back(i);
109 entry.value.push_back('2');
110 entries.push_back(entry);
111 }
112 EXPECT_TRUE(g_kvDelegatePtr->PutBatch(entries) == OK);
113 for (const auto &entry : entries) {
114 Value resultvalue;
115 EXPECT_TRUE(g_kvDelegatePtr->Get(entry.key, resultvalue) == OK);
116 EXPECT_TRUE(resultvalue == entry.value);
117 }
118 for (int i = 0; i < totalSize / 2; i++) { // 2: Half of the total
119 g_kvDelegatePtr->Delete(entries[i].key);
120 Value resultvalue;
121 EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == NOT_FOUND);
122 }
123 for (int i = totalSize / 2; i < totalSize; i++) {
124 Value value = entries[i].value;
125 value.push_back('x');
126 EXPECT_TRUE(g_kvDelegatePtr->Put(entries[i].key, value) == OK);
127 Value resultvalue;
128 EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == OK);
129 EXPECT_TRUE(resultvalue == value);
130 }
131 }
132
DataSync005()133 void DataSync005()
134 {
135 ASSERT_NE(g_communicatorAggregator, nullptr);
136 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
137 ASSERT_TRUE(dataSync != nullptr);
138 dataSync->SendSaveDataNotifyPacket(nullptr, 0, 0, 0, TIME_SYNC_MESSAGE);
139 EXPECT_EQ(g_communicatorAggregator->GetOnlineDevices().size(), 3u); // 3 online dev
140 delete dataSync;
141 }
142
DataSync008()143 void DataSync008()
144 {
145 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
146 ASSERT_TRUE(dataSync != nullptr);
147 auto context = new (std::nothrow) MockSyncTaskContext();
148 dataSync->PutDataMsg(nullptr);
149 bool isNeedHandle = false;
150 bool isContinue = false;
151 EXPECT_EQ(dataSync->MoveNextDataMsg(context, isNeedHandle, isContinue), nullptr);
152 EXPECT_EQ(isNeedHandle, false);
153 EXPECT_EQ(isContinue, false);
154 delete dataSync;
155 delete context;
156 }
157
ReSetWaterDogTest001()158 void ReSetWaterDogTest001()
159 {
160 /**
161 * @tc.steps: step1. put 10 key/value
162 * @tc.expected: step1, put return OK.
163 */
164 for (int i = 0; i < 5; i++) { // put 5 key
165 Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 1024); // rand num 1024 for test
166 Value value;
167 DistributedDBToolsUnitTest::GetRandomKeyValue(value, 10 * 50 * 1024u); // 10 * 50 * 1024 = 500k
168 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
169 }
170 /**
171 * @tc.steps: step2. SetDeviceMtuSize
172 * @tc.expected: step2, return OK.
173 */
174 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 50 * 1024u); // 50 * 1024u = 50k
175 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 50 * 1024u); // 50 * 1024u = 50k
176 /**
177 * @tc.steps: step3. deviceA,deviceB sync to each other at same time
178 * @tc.expected: step3. sync should return OK.
179 */
180 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true), E_OK);
181 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
182 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
183 }
184 }
185
186 class DistributedDBSingleVerP2PComplexSyncTest : public testing::Test {
187 public:
188 static void SetUpTestCase(void);
189 static void TearDownTestCase(void);
190 void SetUp();
191 void TearDown();
192 };
193
SetUpTestCase(void)194 void DistributedDBSingleVerP2PComplexSyncTest::SetUpTestCase(void)
195 {
196 /**
197 * @tc.setup: Init datadir and Virtual Communicator.
198 */
199 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
200 g_config.dataDir = g_testDir;
201 g_mgr.SetKvStoreConfig(g_config);
202
203 string dir = g_testDir + "/single_ver";
204 DIR* dirTmp = opendir(dir.c_str());
205 if (dirTmp == nullptr) {
206 OS::MakeDBDirectory(dir);
207 } else {
208 closedir(dirTmp);
209 }
210
211 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
212 ASSERT_TRUE(g_communicatorAggregator != nullptr);
213 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
214 }
215
TearDownTestCase(void)216 void DistributedDBSingleVerP2PComplexSyncTest::TearDownTestCase(void)
217 {
218 /**
219 * @tc.teardown: Release virtual Communicator and clear data dir.
220 */
221 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
222 LOGE("rm test db files error!");
223 }
224 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
225 }
226
SetUp(void)227 void DistributedDBSingleVerP2PComplexSyncTest::SetUp(void)
228 {
229 DistributedDBToolsUnitTest::PrintTestCaseInfo();
230 /**
231 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
232 */
233 KvStoreNbDelegate::Option option;
234 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
235 ASSERT_TRUE(g_kvDelegateStatus == OK);
236 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
237 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
238 ASSERT_TRUE(g_deviceB != nullptr);
239 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
240 ASSERT_TRUE(syncInterfaceB != nullptr);
241 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
242
243 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
244 ASSERT_TRUE(g_deviceC != nullptr);
245 VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
246 ASSERT_TRUE(syncInterfaceC != nullptr);
247 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
248
249 auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
250 const std::string &deviceId, uint8_t flag) -> bool {
251 return true;
252 };
253 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
254 }
255
TearDown(void)256 void DistributedDBSingleVerP2PComplexSyncTest::TearDown(void)
257 {
258 /**
259 * @tc.teardown: Release device A, B, C
260 */
261 if (g_kvDelegatePtr != nullptr) {
262 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
263 g_kvDelegatePtr = nullptr;
264 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
265 LOGD("delete kv store status %d", status);
266 ASSERT_TRUE(status == OK);
267 }
268 if (g_deviceB != nullptr) {
269 delete g_deviceB;
270 g_deviceB = nullptr;
271 }
272 if (g_deviceC != nullptr) {
273 delete g_deviceC;
274 g_deviceC = nullptr;
275 }
276 PermissionCheckCallbackV2 nullCallback;
277 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
278 }
279
280 /**
281 * @tc.name: SaveDataNotify001
282 * @tc.desc: Test SaveDataNotify function, delay < 30s should sync ok, > 36 should timeout
283 * @tc.type: FUNC
284 * @tc.require:
285 * @tc.author: xushaohua
286 */
287 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SaveDataNotify001, TestSize.Level3)
288 {
289 DBStatus status = OK;
290 const int waitFiveSeconds = 5000;
291 const int waitThirtySeconds = 30000;
292 const int waitThirtySixSeconds = 36000;
293 std::vector<std::string> devices;
294 devices.push_back(g_deviceB->GetDeviceId());
295
296 /**
297 * @tc.steps: step1. deviceA put {k1, v1}
298 */
299 Key key = {'1'};
300 Value value = {'1'};
301 status = g_kvDelegatePtr->Put(key, value);
302 ASSERT_TRUE(status == OK);
303
304 /**
305 * @tc.steps: step2. deviceB set sava data dely 5s
306 */
307 g_deviceB->SetSaveDataDelayTime(waitFiveSeconds);
308
309 /**
310 * @tc.steps: step3. deviceA call sync and wait
311 * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
312 */
313 std::map<std::string, DBStatus> result;
314 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
315 ASSERT_TRUE(status == OK);
316 ASSERT_TRUE(result.size() == devices.size());
317 ASSERT_TRUE(result[DEVICE_B] == OK);
318
319 /**
320 * @tc.steps: step4. deviceB set sava data dely 30s and put {k1, v1}
321 */
322 g_deviceB->SetSaveDataDelayTime(waitThirtySeconds);
323 status = g_kvDelegatePtr->Put(key, value);
324 ASSERT_TRUE(status == OK);
325 /**
326 * @tc.steps: step3. deviceA call sync and wait
327 * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
328 */
329 result.clear();
330 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
331 ASSERT_TRUE(status == OK);
332 ASSERT_TRUE(result.size() == devices.size());
333 ASSERT_TRUE(result[DEVICE_B] == OK);
334
335 /**
336 * @tc.steps: step4. deviceB set sava data dely 36s and put {k1, v1}
337 */
338 g_deviceB->SetSaveDataDelayTime(waitThirtySixSeconds);
339 status = g_kvDelegatePtr->Put(key, value);
340 ASSERT_TRUE(status == OK);
341 /**
342 * @tc.steps: step5. deviceA call sync and wait
343 * @tc.expected: step5. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
344 */
345 result.clear();
346 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
347 ASSERT_TRUE(status == OK);
348 ASSERT_TRUE(result.size() == devices.size());
349 ASSERT_TRUE(result[DEVICE_B] == TIME_OUT);
350 }
351
352 /**
353 * @tc.name: SametimeSync001
354 * @tc.desc: Test 2 device sync with each other
355 * @tc.type: FUNC
356 * @tc.require:
357 * @tc.author: zhangqiquan
358 */
359 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync001, TestSize.Level3)
360 {
361 DBStatus status = OK;
362 std::vector<std::string> devices;
363 devices.push_back(g_deviceB->GetDeviceId());
364
365 int responseCount = 0;
366 int requestCount = 0;
367 Key key = {'1'};
368 Value value = {'1'};
369 /**
370 * @tc.steps: step1. make sure deviceB send pull firstly and response_pull secondly
371 * @tc.expected: step1. deviceA put data when finish push task. put data should return OK.
372 */
373 g_communicatorAggregator->RegOnDispatch([&responseCount, &requestCount, &key, &value](
__anon762f6f830302( const std::string &target, DistributedDB::Message *msg) 374 const std::string &target, DistributedDB::Message *msg) {
375 if (target != "real_device" || msg->GetMessageId() != DATA_SYNC_MESSAGE) {
376 return;
377 }
378
379 if (msg->GetMessageType() == TYPE_RESPONSE) {
380 responseCount++;
381 if (responseCount == 1) { // 1 is the ack which B response A's push task
382 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), DBStatus::OK);
383 std::this_thread::sleep_for(std::chrono::seconds(1));
384 } else if (responseCount == 2) { // 2 is the ack which B response A's response_pull task
385 msg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
386 }
387 } else if (msg->GetMessageType() == TYPE_REQUEST) {
388 requestCount++;
389 if (requestCount == 1) { // 1 is A push task
390 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2 sec
391 }
392 }
393 });
394 /**
395 * @tc.steps: step2. deviceA,deviceB sync to each other at same time
396 * @tc.expected: step2. sync should return OK.
397 */
398 std::map<std::string, DBStatus> result;
__anon762f6f830402null399 std::thread subThread([]{
400 g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true);
401 });
402 status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_PULL, result);
403 subThread.join();
404 g_communicatorAggregator->RegOnDispatch(nullptr);
405
406 EXPECT_TRUE(status == OK);
407 ASSERT_TRUE(result.size() == devices.size());
408 EXPECT_TRUE(result[DEVICE_B] == OK);
409 Value actualValue;
410 g_kvDelegatePtr->Get(key, actualValue);
411 EXPECT_EQ(actualValue, value);
412 }
413
414 /**
415 * @tc.name: SametimeSync002
416 * @tc.desc: Test 2 device sync with each other with water error
417 * @tc.type: FUNC
418 * @tc.require:
419 * @tc.author: zhangqiquan
420 */
421 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync002, TestSize.Level3)
422 {
423 DBStatus status = OK;
424 std::vector<std::string> devices;
425 devices.push_back(g_deviceB->GetDeviceId());
426 g_kvDelegatePtr->Put({'k', '1'}, {'v', '1'});
427 /**
428 * @tc.steps: step1. make sure deviceA push data failed and increase water mark
429 * @tc.expected: step1. deviceA push failed with timeout
430 */
__anon762f6f830502(const std::string &target, DistributedDB::Message *msg) 431 g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
432 ASSERT_NE(msg, nullptr);
433 if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
434 msg->SetMessageId(INVALID_MESSAGE_ID);
435 }
436 });
437 std::map<std::string, DBStatus> result;
__anon762f6f830602(const std::map<std::string, DBStatus> &map) 438 auto callback = [&result](const std::map<std::string, DBStatus> &map) {
439 result = map;
440 };
441 Query query = Query::Select().PrefixKey({'k', '1'});
442 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
443 ASSERT_TRUE(result.size() == devices.size());
444 EXPECT_TRUE(result[DEVICE_B] == TIME_OUT);
445 /**
446 * @tc.steps: step2. A push to B with query2, sleep 1s for waiting step3
447 * @tc.expected: step2. sync should return OK.
448 */
__anon762f6f830702(const std::string &target, DistributedDB::Message *msg) 449 g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
450 ASSERT_NE(msg, nullptr);
451 if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
452 std::this_thread::sleep_for(std::chrono::seconds(1));
453 }
454 });
__anon762f6f830802null455 std::thread subThread([&devices] {
456 std::map<std::string, DBStatus> result;
457 auto callback = [&result](const std::map<std::string, DBStatus> &map) {
458 result = map;
459 };
460 Query query = Query::Select().PrefixKey({'k', '2'});
461 LOGD("Begin PUSH");
462 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
463 ASSERT_TRUE(result.size() == devices.size());
464 EXPECT_TRUE(result[DEVICE_B] == OK);
465 });
466 /**
467 * @tc.steps: step3. B pull to A when A is in push task
468 * @tc.expected: step3. sync should return OP_FINISHED_ALL.
469 */
470 std::this_thread::sleep_for(std::chrono::milliseconds(100));
471 std::map<std::string, int> virtualResult;
472 g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, query,
__anon762f6f830a02(const std::map<std::string, int> &map) 473 [&virtualResult](const std::map<std::string, int> &map) {
474 virtualResult = map;
475 }, true);
476 EXPECT_TRUE(status == OK);
477 ASSERT_EQ(virtualResult.size(), devices.size());
478 EXPECT_EQ(virtualResult[DEVICE_A], SyncOperation::OP_FINISHED_ALL);
479 g_communicatorAggregator->RegOnDispatch(nullptr);
480 subThread.join();
481 }
482
483 /**
484 * @tc.name: DatabaseOnlineCallback001
485 * @tc.desc: check database status notify online callback
486 * @tc.type: FUNC
487 * @tc.require:
488 * @tc.author: zhuwentao
489 */
490 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOnlineCallback001, TestSize.Level1)
491 {
492 /**
493 * @tc.steps: step1. SetStoreStatusNotifier
494 * @tc.expected: step1. SetStoreStatusNotifier ok
495 */
496 std::string targetDev = "DEVICE_X";
497 bool isCheckOk = false;
498 auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anon762f6f830b02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 499 const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
500 if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
501 onlineStatus == true) {
502 isCheckOk = true;
503 }};
504 g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
505 /**
506 * @tc.steps: step2. trigger device online
507 * @tc.expected: step2. check callback ok
508 */
509 g_communicatorAggregator->OnlineDevice(targetDev);
510 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
511 EXPECT_EQ(isCheckOk, true);
512 StoreStatusNotifier nullCallback;
513 g_mgr.SetStoreStatusNotifier(nullCallback);
514 }
515
516 /**
517 * @tc.name: DatabaseOfflineCallback001
518 * @tc.desc: check database status notify online callback
519 * @tc.type: FUNC
520 * @tc.require:
521 * @tc.author: zhuwentao
522 */
523 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOfflineCallback001, TestSize.Level1)
524 {
525 /**
526 * @tc.steps: step1. SetStoreStatusNotifier
527 * @tc.expected: step1. SetStoreStatusNotifier ok
528 */
529 std::string targetDev = "DEVICE_X";
530 bool isCheckOk = false;
531 auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anon762f6f830c02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 532 const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
533 if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
534 onlineStatus == false) {
535 isCheckOk = true;
536 }};
537 g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
538 /**
539 * @tc.steps: step2. trigger device offline
540 * @tc.expected: step2. check callback ok
541 */
542 g_communicatorAggregator->OfflineDevice(targetDev);
543 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
544 EXPECT_EQ(isCheckOk, true);
545 StoreStatusNotifier nullCallback;
546 g_mgr.SetStoreStatusNotifier(nullCallback);
547 }
548
549 /**
550 * @tc.name: CloseSync001
551 * @tc.desc: Test 2 delegate close when sync
552 * @tc.type: FUNC
553 * @tc.require:
554 * @tc.author: zhangqiquan
555 */
556 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync001, TestSize.Level3)
557 {
558 DBStatus status = OK;
559 std::vector<std::string> devices;
560 devices.push_back(g_deviceB->GetDeviceId());
561
562 /**
563 * @tc.steps: step1. make sure A sync start
564 */
565 bool sleep = false;
__anon762f6f830d02(const std::string &target, DistributedDB::Message *msg) 566 g_communicatorAggregator->RegOnDispatch([&sleep](const std::string &target, DistributedDB::Message *msg) {
567 if (!sleep) {
568 sleep = true;
569 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s for waiting close db
570 }
571 });
572
573 KvStoreNbDelegate* kvDelegatePtrA = nullptr;
574 KvStoreNbDelegate::Option option;
__anon762f6f830e02(DBStatus s, KvStoreNbDelegate *delegate) 575 g_mgr.GetKvStore(STORE_ID, option, [&status, &kvDelegatePtrA](DBStatus s, KvStoreNbDelegate *delegate) {
576 status = s;
577 kvDelegatePtrA = delegate;
578 });
579 EXPECT_EQ(status, OK);
580 EXPECT_NE(kvDelegatePtrA, nullptr);
581
582 Key key = {'k'};
583 Value value = {'v'};
584 kvDelegatePtrA->Put(key, value);
585 std::map<std::string, DBStatus> result;
__anon762f6f830f02(const std::map<std::string, DBStatus>& statusMap) 586 auto callback = [&result](const std::map<std::string, DBStatus>& statusMap) {
587 result = statusMap;
588 };
589 /**
590 * @tc.steps: step2. deviceA sync and then close
591 * @tc.expected: step2. sync should abort and data don't exist in B
592 */
__anon762f6f831002() 593 std::thread closeThread([&kvDelegatePtrA]() {
594 std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for waiting sync start
595 EXPECT_EQ(g_mgr.CloseKvStore(kvDelegatePtrA), OK);
596 });
597 EXPECT_EQ(kvDelegatePtrA->Sync(devices, SYNC_MODE_PUSH_ONLY, callback, false), OK);
598 LOGD("Sync finish");
599 closeThread.join();
600 std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s for waiting sync finish
601 EXPECT_EQ(result.size(), 0u);
602 VirtualDataItem actualValue;
603 EXPECT_EQ(g_deviceB->GetData(key, actualValue), -E_NOT_FOUND);
604 g_communicatorAggregator->RegOnDispatch(nullptr);
605 }
606
607 /**
608 * @tc.name: CloseSync002
609 * @tc.desc: Test 1 delegate close when in time sync
610 * @tc.type: FUNC
611 * @tc.require:
612 * @tc.author: zhangqiquan
613 */
614 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync002, TestSize.Level3)
615 {
616 /**
617 * @tc.steps: step1. invalid time sync packet from A
618 */
__anon762f6f831102(const std::string &target, DistributedDB::Message *msg) 619 g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
620 ASSERT_NE(msg, nullptr);
621 if (target == DEVICE_B && msg->GetMessageId() == TIME_SYNC_MESSAGE && msg->GetMessageType() == TYPE_REQUEST) {
622 msg->SetMessageId(INVALID_MESSAGE_ID);
623 LOGD("Message is invalid");
624 }
625 });
626 Timestamp currentTime;
627 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
628 g_deviceB->PutData({'k'}, {'v'}, currentTime, 0);
629
630 /**
631 * @tc.steps: step2. B PUSH to A and A close after 1s
632 * @tc.expected: step2. A closing time cost letter than 4s
633 */
__anon762f6f831202() 634 std::thread closingThread([]() {
635 std::this_thread::sleep_for(std::chrono::seconds(1));
636 LOGD("Begin Close");
637 Timestamp beginTime;
638 (void)OS::GetCurrentSysTimeInMicrosecond(beginTime);
639 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
640 Timestamp endTime;
641 (void)OS::GetCurrentSysTimeInMicrosecond(endTime);
642 EXPECT_LE(static_cast<int>(endTime - beginTime), 4 * 1000 * 1000); // waiting 4 * 1000 * 1000 us
643 LOGD("End Close");
644 });
645 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
646 closingThread.join();
647
648 /**
649 * @tc.steps: step3. remove db
650 * @tc.expected: step3. remove ok
651 */
652 g_kvDelegatePtr = nullptr;
653 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
654 LOGD("delete kv store status %d", status);
655 ASSERT_TRUE(status == OK);
656 g_communicatorAggregator->RegOnDispatch(nullptr);
657 }
658
659 /**
660 * @tc.name: OrderbyWriteTimeSync001
661 * @tc.desc: sync query with order by writeTime
662 * @tc.type: FUNC
663 * @tc.require:
664 * @tc.author: zhuwentao
665 */
666 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, OrderbyWriteTimeSync001, TestSize.Level0)
667 {
668 /**
669 * @tc.steps: step1. deviceA subscribe query with order by write time
670 * * @tc.expected: step1. interface return not support
671 */
672 std::vector<std::string> devices;
673 devices.push_back(g_deviceB->GetDeviceId());
674 Query query = Query::Select().PrefixKey({'k'}).OrderByWriteTime(true);
675 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, nullptr, query, true), NOT_SUPPORT);
676 }
677
678
679 /**
680 * @tc.name: Device Offline Sync 001
681 * @tc.desc: Test push sync when device offline
682 * @tc.type: FUNC
683 * @tc.require:
684 * @tc.author: xushaohua
685 */
686 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync001, TestSize.Level1)
687 {
688 std::vector<std::string> devices;
689 devices.push_back(g_deviceB->GetDeviceId());
690 devices.push_back(g_deviceC->GetDeviceId());
691
692 /**
693 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2}, {k3 delete}, {k4,v2}
694 */
695 Key key1 = {'1'};
696 Value value1 = {'1'};
697 ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value1) == OK);
698
699 Key key2 = {'2'};
700 Value value2 = {'2'};
701 ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value2) == OK);
702
703 Key key3 = {'3'};
704 Value value3 = {'3'};
705 ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value3) == OK);
706 ASSERT_TRUE(g_kvDelegatePtr->Delete(key3) == OK);
707
708 Key key4 = {'4'};
709 Value value4 = {'4'};
710 ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value4) == OK);
711
712 /**
713 * @tc.steps: step2. deviceB offline
714 */
715 g_deviceB->Offline();
716
717 /**
718 * @tc.steps: step3. deviceA call pull sync
719 * @tc.expected: step3. sync should return OK.
720 */
721 std::map<std::string, DBStatus> result;
722 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
723 ASSERT_TRUE(status == OK);
724
725 /**
726 * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
727 * deviceC has {k1, v1}, {k2, v2}, {k3 delete}, {k4,v4}
728 */
729 for (const auto &pair : result) {
730 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
731 if (pair.first == DEVICE_B) {
732 // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
733 // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
734 // The returned status is COMM_FAILURE
735 EXPECT_TRUE((pair.second == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
736 (pair.second == COMM_FAILURE));
737 } else {
738 EXPECT_EQ(pair.second, OK);
739 }
740 }
741 VirtualDataItem item;
742 g_deviceC->GetData(key1, item);
743 EXPECT_TRUE(item.value == value1);
744 item.value.clear();
745 g_deviceC->GetData(key2, item);
746 EXPECT_TRUE(item.value == value2);
747 item.value.clear();
748 Key hashKey;
749 DistributedDBToolsUnitTest::CalcHash(key3, hashKey);
750 EXPECT_TRUE(g_deviceC->GetData(hashKey, item) == -E_NOT_FOUND);
751 item.value.clear();
752 g_deviceC->GetData(key4, item);
753 EXPECT_TRUE(item.value == value4);
754 }
755
756 /**
757 * @tc.name: Device Offline Sync 002
758 * @tc.desc: Test pull sync when device offline
759 * @tc.type: FUNC
760 * @tc.require:
761 * @tc.author: xushaohua
762 */
763 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync002, TestSize.Level1)
764 {
765 std::vector<std::string> devices;
766 devices.push_back(g_deviceB->GetDeviceId());
767 devices.push_back(g_deviceC->GetDeviceId());
768
769 /**
770 * @tc.steps: step1. deviceB put {k1, v1}
771 */
772 Key key1 = {'1'};
773 Value value1 = {'1'};
774 g_deviceB->PutData(key1, value1, 0, 0);
775
776 /**
777 * @tc.steps: step2. deviceB offline
778 */
779 g_deviceB->Offline();
780
781 /**
782 * @tc.steps: step3. deviceC put {k2, v2}, {k3, delete}, {k4, v4}
783 */
784 Key key2 = {'2'};
785 Value value2 = {'2'};
786 g_deviceC->PutData(key2, value2, 0, 0);
787
788 Key key3 = {'3'};
789 Value value3 = {'3'};
790 g_deviceC->PutData(key3, value3, 0, 1);
791
792 Key key4 = {'4'};
793 Value value4 = {'4'};
794 g_deviceC->PutData(key4, value4, 0, 0);
795
796 /**
797 * @tc.steps: step2. deviceA call pull sync
798 * @tc.expected: step2. sync should return OK.
799 */
800 std::map<std::string, DBStatus> result;
801 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
802 ASSERT_TRUE(status == OK);
803
804 /**
805 * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
806 * deviceA has {k2, v2}, {k3 delete}, {k4,v4}
807 */
808 for (const auto &pair : result) {
809 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
810 if (pair.first == DEVICE_B) {
811 // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
812 // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
813 // The returned status is COMM_FAILURE
814 EXPECT_TRUE((pair.second == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
815 (pair.second == COMM_FAILURE));
816 } else {
817 EXPECT_EQ(pair.second, OK);
818 }
819 }
820
821 Value value5;
822 EXPECT_TRUE(g_kvDelegatePtr->Get(key1, value5) != OK);
823 g_kvDelegatePtr->Get(key2, value5);
824 EXPECT_EQ(value5, value2);
825 EXPECT_TRUE(g_kvDelegatePtr->Get(key3, value5) != OK);
826 g_kvDelegatePtr->Get(key4, value5);
827 EXPECT_EQ(value5, value4);
828 }
829
830 /**
831 * @tc.name: EncryptedAlgoUpgrade001
832 * @tc.desc: Test upgrade encrypted db can sync normally
833 * @tc.type: FUNC
834 * @tc.require:
835 * @tc.author: zhuwentao
836 */
837 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, EncryptedAlgoUpgrade001, TestSize.Level3)
838 {
839 /**
840 * @tc.steps: step1. clear db
841 * * @tc.expected: step1. interface return ok
842 */
843 if (g_kvDelegatePtr != nullptr) {
844 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
845 g_kvDelegatePtr = nullptr;
846 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
847 LOGD("delete kv store status %d", status);
848 ASSERT_TRUE(status == OK);
849 }
850
851 CipherPassword passwd;
852 std::vector<uint8_t> passwdVect = {'p', 's', 'd', '1'};
853 passwd.SetValue(passwdVect.data(), passwdVect.size());
854 /**
855 * @tc.steps: step2. open old db by sql
856 * * @tc.expected: step2. interface return ok
857 */
858 std::string identifier = DBCommon::GenerateIdentifierId(STORE_ID, APP_ID, USER_ID);
859 std::string hashDir = DBCommon::TransferHashString(identifier);
860 std::string hexHashDir = DBCommon::TransferStringToHex(hashDir);
861 std::string dbPath = g_testDir + "/" + hexHashDir + "/single_ver";
862 ASSERT_TRUE(DBCommon::CreateDirectory(g_testDir + "/" + hexHashDir) == E_OK);
863 ASSERT_TRUE(DBCommon::CreateDirectory(dbPath) == E_OK);
864 std::vector<std::string> dbDir {DBConstant::MAINDB_DIR, DBConstant::METADB_DIR, DBConstant::CACHEDB_DIR};
865 for (const auto &item : dbDir) {
866 ASSERT_TRUE(DBCommon::CreateDirectory(dbPath + "/" + item) == E_OK);
867 }
868 uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
869 sqlite3 *db;
870 std::string fileUrl = dbPath + "/" + DBConstant::MAINDB_DIR + "/" + DBConstant::SINGLE_VER_DATA_STORE + ".db";
871 ASSERT_TRUE(sqlite3_open_v2(fileUrl.c_str(), &db, flag, nullptr) == SQLITE_OK);
872 SQLiteUtils::SetKeyInner(db, CipherType::AES_256_GCM, passwd, DBConstant::DEFAULT_ITER_TIMES);
873 /**
874 * @tc.steps: step3. create table and close
875 * * @tc.expected: step3. interface return ok
876 */
877 ASSERT_TRUE(SQLiteUtils::ExecuteRawSQL(db, CREATE_SYNC_TABLE_SQL) == E_OK);
878 sqlite3_close_v2(db);
879 db = nullptr;
880 LOGI("create old db success");
881 /**
882 * @tc.steps: step4. get kvstore
883 * * @tc.expected: step4. interface return ok
884 */
885 KvStoreNbDelegate::Option option;
886 option.isEncryptedDb = true;
887 option.cipher = CipherType::AES_256_GCM;
888 option.passwd = passwd;
889 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
890 ASSERT_TRUE(g_kvDelegateStatus == OK);
891 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
892 /**
893 * @tc.steps: step5. sync ok
894 * * @tc.expected: step5. interface return ok
895 */
896 PullSyncTest();
897 /**
898 * @tc.steps: step5. crud ok
899 * * @tc.expected: step5. interface return ok
900 */
901 CrudTest();
902 }
903
904 /**
905 * @tc.name: RemoveDeviceData002
906 * @tc.desc: test remove device data before sync
907 * @tc.type: FUNC
908 * @tc.require:
909 * @tc.author: zhuwentao
910 */
911 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData002, TestSize.Level1)
912 {
913 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
914 /**
915 * @tc.steps: step1. sync deviceB data to A and check data
916 * * @tc.expected: step1. interface return ok
917 */
918 Key key1 = {'1'};
919 Key key2 = {'2'};
920 Value value = {'1'};
921 Timestamp currentTime;
922 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
923 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
924 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
925 EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
926 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
927 Value actualValue;
928 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
929 EXPECT_EQ(actualValue, value);
930 actualValue.clear();
931 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
932 EXPECT_EQ(actualValue, value);
933 /**
934 * @tc.steps: step2. call RemoveDeviceData
935 * * @tc.expected: step2. interface return ok
936 */
937 g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
938 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
939 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
940 /**
941 * @tc.steps: step3. sync to device A again and check data
942 * * @tc.expected: step3. sync ok
943 */
944 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
945 actualValue.clear();
946 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
947 EXPECT_EQ(actualValue, value);
948 actualValue.clear();
949 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
950 EXPECT_EQ(actualValue, value);
951 }
952
953 /**
954 * @tc.name: DataSync001
955 * @tc.desc: Test Data Sync when Initialize
956 * @tc.type: FUNC
957 * @tc.require:
958 * @tc.author: zhuwentao
959 */
960 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync001, TestSize.Level1)
961 {
962 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
963 ASSERT_TRUE(dataSync != nullptr);
964 std::shared_ptr<Metadata> inMetadata = nullptr;
965 std::string deviceId;
966 Message message;
967 VirtualSingleVerSyncDBInterface tmpInterface;
968 VirtualCommunicator tmpCommunicator(deviceId, g_communicatorAggregator);
969 EXPECT_EQ(dataSync->Initialize(nullptr, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
970 EXPECT_EQ(dataSync->Initialize(&tmpInterface, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
971 EXPECT_EQ(dataSync->Initialize(&tmpInterface, &tmpCommunicator, inMetadata, deviceId), -E_INVALID_ARGS);
972 delete dataSync;
973 }
974
975 /**
976 * @tc.name: DataSync002
977 * @tc.desc: Test active sync with invalid param in DataSync Class
978 * @tc.type: FUNC
979 * @tc.require:
980 * @tc.author: zhuwentao
981 */
982 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync002, TestSize.Level1)
983 {
984 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
985 ASSERT_TRUE(dataSync != nullptr);
986 Message message;
987 EXPECT_EQ(dataSync->TryContinueSync(nullptr, &message), -E_INVALID_ARGS);
988 EXPECT_EQ(dataSync->TryContinueSync(nullptr, nullptr), -E_INVALID_ARGS);
989 EXPECT_EQ(dataSync->PushStart(nullptr), -E_INVALID_ARGS);
990 EXPECT_EQ(dataSync->PushPullStart(nullptr), -E_INVALID_ARGS);
991 EXPECT_EQ(dataSync->PullRequestStart(nullptr), -E_INVALID_ARGS);
992 EXPECT_EQ(dataSync->PullResponseStart(nullptr), -E_INVALID_ARGS);
993 delete dataSync;
994 }
995
996 /**
997 * @tc.name: DataSync003
998 * @tc.desc: Test receive invalid request data packet in DataSync Class
999 * @tc.type: FUNC
1000 * @tc.require:
1001 * @tc.author: zhuwentao
1002 */
1003 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync003, TestSize.Level1)
1004 {
1005 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1006 ASSERT_TRUE(dataSync != nullptr);
1007 uint64_t tmpMark = 0;
1008 Message message;
1009 EXPECT_EQ(dataSync->DataRequestRecv(nullptr, nullptr, tmpMark), -E_INVALID_ARGS);
1010 EXPECT_EQ(dataSync->DataRequestRecv(nullptr, &message, tmpMark), -E_INVALID_ARGS);
1011 delete dataSync;
1012 }
1013
1014 /**
1015 * @tc.name: DataSync004
1016 * @tc.desc: Test receive invalid ack packet in DataSync Class
1017 * @tc.type: FUNC
1018 * @tc.require:
1019 * @tc.author: zhuwentao
1020 */
1021 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync004, TestSize.Level1)
1022 {
1023 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1024 ASSERT_TRUE(dataSync != nullptr);
1025 Message message;
1026 TestSingleVerKvSyncTaskContext tmpContext;
1027 EXPECT_EQ(dataSync->AckPacketIdCheck(nullptr), false);
1028 EXPECT_EQ(dataSync->AckPacketIdCheck(&message), false);
1029 EXPECT_EQ(dataSync->AckRecv(&tmpContext, nullptr), -E_INVALID_ARGS);
1030 EXPECT_EQ(dataSync->AckRecv(nullptr, nullptr), -E_INVALID_ARGS);
1031 EXPECT_EQ(dataSync->AckRecv(nullptr, &message), -E_INVALID_ARGS);
1032 delete dataSync;
1033 }
1034
1035 /**
1036 * @tc.name: DataSync005
1037 * @tc.desc: Test receive invalid notify packet in DataSync Class
1038 * @tc.type: FUNC
1039 * @tc.require:
1040 * @tc.author: zhuwentao
1041 */
1042 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync005, TestSize.Level1)
1043 {
1044 ASSERT_NO_FATAL_FAILURE(DataSync005());
1045 }
1046
1047 /**
1048 * @tc.name: DataSync006
1049 * @tc.desc: Test control start with invalid param in DataSync Class
1050 * @tc.type: FUNC
1051 * @tc.require:
1052 * @tc.author: zhuwentao
1053 */
1054 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync006, TestSize.Level1)
1055 {
1056 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1057 ASSERT_TRUE(dataSync != nullptr);
1058 TestSingleVerKvSyncTaskContext tmpContext;
1059 EXPECT_EQ(dataSync->ControlCmdStart(nullptr), -E_INVALID_ARGS);
1060 EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1061 std::shared_ptr<SubscribeManager> subManager = std::make_shared<SubscribeManager>();
1062 tmpContext.SetSubscribeManager(subManager);
1063 tmpContext.SetMode(SyncModeType::INVALID_MODE);
1064 EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1065 std::set<Key> Keys = {{'a'}, {'b'}};
1066 Query query = Query::Select().InKeys(Keys);
1067 QuerySyncObject innerQuery(query);
1068 tmpContext.SetQuery(innerQuery);
1069 tmpContext.SetMode(SyncModeType::SUBSCRIBE_QUERY);
1070 EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_NOT_SUPPORT);
1071 delete dataSync;
1072 subManager = nullptr;
1073 }
1074
1075 /**
1076 * @tc.name: DataSync007
1077 * @tc.desc: Test receive invalid control packet in DataSync Class
1078 * @tc.type: FUNC
1079 * @tc.require:
1080 * @tc.author: zhuwentao
1081 */
1082 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync007, TestSize.Level1)
1083 {
1084 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1085 ASSERT_TRUE(dataSync != nullptr);
1086 Message message;
1087 ControlRequestPacket packet;
1088 TestSingleVerKvSyncTaskContext tmpContext;
1089 EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1090 message.SetCopiedObject(packet);
1091 EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1092 delete dataSync;
1093 }
1094
1095 /**
1096 * @tc.name: DataSync008
1097 * @tc.desc: Test pull null msg in dataQueue in DataSync Class
1098 * @tc.type: FUNC
1099 * @tc.require:
1100 * @tc.author: zhuwentao
1101 */
1102 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync008, TestSize.Level1)
1103 {
1104 ASSERT_NO_FATAL_FAILURE(DataSync008());
1105 }
1106
1107 /**
1108 * @tc.name: SyncRetry001
1109 * @tc.desc: use sync retry sync use push
1110 * @tc.type: FUNC
1111 * @tc.require:
1112 * @tc.author: zhuwentao
1113 */
1114 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry001, TestSize.Level3)
1115 {
1116 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1117 std::vector<std::string> devices;
1118 devices.push_back(g_deviceB->GetDeviceId());
1119
1120 /**
1121 * @tc.steps: step1. set sync retry
1122 * @tc.expected: step1, Pragma return OK.
1123 */
1124 int pragmaData = 1;
1125 PragmaData input = static_cast<PragmaData>(&pragmaData);
1126 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1127
1128 /**
1129 * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1130 */
1131 ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1132
1133 /**
1134 * @tc.steps: step3. deviceA call sync and wait
1135 * @tc.expected: step3. sync should return OK.
1136 */
1137 std::map<std::string, DBStatus> result;
1138 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1139
1140 /**
1141 * @tc.expected: step4. onComplete should be called, and status is time_out
1142 */
1143 ASSERT_TRUE(result.size() == devices.size());
1144 for (const auto &pair : result) {
1145 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1146 EXPECT_TRUE(pair.second == OK);
1147 }
1148 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1149 }
1150
1151 /**
1152 * @tc.name: SyncRetry002
1153 * @tc.desc: use sync retry sync use pull
1154 * @tc.type: FUNC
1155 * @tc.require:
1156 * @tc.author: zhuwentao
1157 */
1158 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry002, TestSize.Level3)
1159 {
1160 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE, 4u);
1161 std::vector<std::string> devices;
1162 devices.push_back(g_deviceB->GetDeviceId());
1163
1164 /**
1165 * @tc.steps: step1. set sync retry
1166 * @tc.expected: step1, Pragma return OK.
1167 */
1168 int pragmaData = 1;
1169 PragmaData input = static_cast<PragmaData>(&pragmaData);
1170 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1171
1172 /**
1173 * @tc.steps: step2. deviceA call sync and wait
1174 * @tc.expected: step2. sync should return OK.
1175 */
1176 std::map<std::string, DBStatus> result;
1177 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1178
1179 /**
1180 * @tc.expected: step3. onComplete should be called, and status is time_out
1181 */
1182 ASSERT_TRUE(result.size() == devices.size());
1183 for (const auto &pair : result) {
1184 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1185 EXPECT_TRUE(pair.second == TIME_OUT);
1186 }
1187 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1188 }
1189
1190 /**
1191 * @tc.name: SyncRetry003
1192 * @tc.desc: use sync retry sync use push by compress
1193 * @tc.type: FUNC
1194 * @tc.require:
1195 * @tc.author: zhuwentao
1196 */
1197 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry003, TestSize.Level3)
1198 {
1199 if (g_kvDelegatePtr != nullptr) {
1200 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1201 g_kvDelegatePtr = nullptr;
1202 }
1203 /**
1204 * @tc.steps: step1. open db use Compress
1205 * @tc.expected: step1, Pragma return OK.
1206 */
1207 KvStoreNbDelegate::Option option;
1208 option.isNeedCompressOnSync = true;
1209 option.compressionRate = 70;
1210 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1211 ASSERT_TRUE(g_kvDelegateStatus == OK);
1212 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1213
1214 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1215 std::vector<std::string> devices;
1216 devices.push_back(g_deviceB->GetDeviceId());
1217
1218 /**
1219 * @tc.steps: step2. set sync retry
1220 * @tc.expected: step2, Pragma return OK.
1221 */
1222 int pragmaData = 1;
1223 PragmaData input = static_cast<PragmaData>(&pragmaData);
1224 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1225
1226 /**
1227 * @tc.steps: step3. deviceA put {k1, v1}, {k2, v2}
1228 */
1229 ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1230
1231 /**
1232 * @tc.steps: step4. deviceA call sync and wait
1233 * @tc.expected: step4. sync should return OK.
1234 */
1235 std::map<std::string, DBStatus> result;
1236 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1237
1238 /**
1239 * @tc.expected: step5. onComplete should be called, and status is time_out
1240 */
1241 ASSERT_TRUE(result.size() == devices.size());
1242 for (const auto &pair : result) {
1243 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1244 EXPECT_TRUE(pair.second == OK);
1245 }
1246 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1247 }
1248
1249 /**
1250 * @tc.name: SyncRetry004
1251 * @tc.desc: use query sync retry sync use push
1252 * @tc.type: FUNC
1253 * @tc.require:
1254 * @tc.author: zhuwentao
1255 */
1256 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry004, TestSize.Level3)
1257 {
1258 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1259 std::vector<std::string> devices;
1260 devices.push_back(g_deviceB->GetDeviceId());
1261
1262 /**
1263 * @tc.steps: step1. set sync retry
1264 * @tc.expected: step1, Pragma return OK.
1265 */
1266 int pragmaData = 1;
1267 PragmaData input = static_cast<PragmaData>(&pragmaData);
1268 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1269
1270 /**
1271 * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1272 */
1273 for (int i = 0; i < 5; i++) {
1274 Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 128); // rand num 1024 for test
1275 Value value;
1276 DistributedDBToolsUnitTest::GetRandomKeyValue(value, 256u);
1277 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1278 }
1279
1280 /**
1281 * @tc.steps: step3. deviceA call sync and wait
1282 * @tc.expected: step3. sync should return OK.
1283 */
1284 std::map<std::string, DBStatus> result;
1285 std::vector<uint8_t> prefixKey({'a', 'b'});
1286 Query query = Query::Select().PrefixKey(prefixKey);
1287 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
1288
1289 /**
1290 * @tc.expected: step4. onComplete should be called, and status is time_out
1291 */
1292 ASSERT_TRUE(result.size() == devices.size());
1293 for (const auto &pair : result) {
1294 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1295 EXPECT_TRUE(pair.second == OK);
1296 }
1297 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1298 }
1299
1300 /**
1301 * @tc.name: SyncRetry005
1302 * @tc.desc: use sync retry sync use pull by compress
1303 * @tc.type: FUNC
1304 * @tc.require:
1305 * @tc.author: zhangqiquan
1306 */
1307 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry005, TestSize.Level3)
1308 {
1309 if (g_kvDelegatePtr != nullptr) {
1310 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1311 g_kvDelegatePtr = nullptr;
1312 }
1313 /**
1314 * @tc.steps: step1. open db use Compress
1315 * @tc.expected: step1, Pragma return OK.
1316 */
1317 KvStoreNbDelegate::Option option;
1318 option.isNeedCompressOnSync = true;
1319 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1320 ASSERT_TRUE(g_kvDelegateStatus == OK);
1321 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1322
1323 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1324 std::vector<std::string> devices;
1325 devices.push_back(g_deviceB->GetDeviceId());
1326
1327 /**
1328 * @tc.steps: step2. set sync retry
1329 * @tc.expected: step2, Pragma return OK.
1330 */
1331 int pragmaData = 1;
1332 PragmaData input = static_cast<PragmaData>(&pragmaData);
1333 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1334
1335 /**
1336 * @tc.steps: step3. deviceA call sync and wait
1337 * @tc.expected: step3. sync should return OK.
1338 */
1339 std::map<std::string, DBStatus> result;
1340 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1341
1342 /**
1343 * @tc.expected: step4. onComplete should be called, and status is time_out
1344 */
1345 ASSERT_TRUE(result.size() == devices.size());
1346 for (const auto &pair : result) {
1347 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1348 EXPECT_EQ(pair.second, OK);
1349 }
1350 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1351 }
1352
1353 /**
1354 * @tc.name: ReSetWatchDogTest001
1355 * @tc.desc: trigger resetWatchDog while pull
1356 * @tc.type: FUNC
1357 * @tc.require:
1358 * @tc.author: zhuwentao
1359 */
1360 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, ReSetWaterDogTest001, TestSize.Level3)
1361 {
1362 ASSERT_NO_FATAL_FAILURE(ReSetWaterDogTest001());
1363 }
1364
1365 /**
1366 * @tc.name: RebuildSync001
1367 * @tc.desc: rebuild db and sync again
1368 * @tc.type: FUNC
1369 * @tc.require:
1370 * @tc.author: zhuwentao
1371 */
1372 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync001, TestSize.Level3)
1373 {
1374 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1375 /**
1376 * @tc.steps: step1. sync deviceB data to A and check data
1377 * * @tc.expected: step1. interface return ok
1378 */
1379 Key key1 = {'1'};
1380 Key key2 = {'2'};
1381 Value value = {'1'};
1382 Timestamp currentTime;
1383 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1384 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1385 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1386 EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1387 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1388 std::this_thread::sleep_for(std::chrono::seconds(1));
1389
1390 Value actualValue;
1391 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1392 EXPECT_EQ(actualValue, value);
1393 actualValue.clear();
1394 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1395 EXPECT_EQ(actualValue, value);
1396 /**
1397 * @tc.steps: step2. delete db and rebuild
1398 * * @tc.expected: step2. interface return ok
1399 */
1400 g_mgr.CloseKvStore(g_kvDelegatePtr);
1401 g_kvDelegatePtr = nullptr;
1402 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1403 KvStoreNbDelegate::Option option;
1404 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1405 ASSERT_TRUE(g_kvDelegateStatus == OK);
1406 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1407 /**
1408 * @tc.steps: step3. sync to device A again
1409 * * @tc.expected: step3. sync ok
1410 */
1411 value = {'2'};
1412 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1413 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1414 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1415 std::this_thread::sleep_for(std::chrono::seconds(1));
1416 /**
1417 * @tc.steps: step4. check data in device A
1418 * * @tc.expected: step4. check ok
1419 */
1420 actualValue.clear();
1421 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1422 EXPECT_EQ(actualValue, value);
1423 }
1424
1425 /**
1426 * @tc.name: RebuildSync002
1427 * @tc.desc: test clear remote data when receive data
1428 * @tc.type: FUNC
1429 * @tc.require:
1430 * @tc.author: zhuwentao
1431 */
1432 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync002, TestSize.Level1)
1433 {
1434 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1435 std::vector<std::string> devices;
1436 devices.push_back(g_deviceB->GetDeviceId());
1437 /**
1438 * @tc.steps: step1. device A SET_WIPE_POLICY
1439 * * @tc.expected: step1. interface return ok
1440 */
1441 int pragmaData = 2; // 2 means enable
1442 PragmaData input = static_cast<PragmaData>(&pragmaData);
1443 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_WIPE_POLICY, input) == OK);
1444 /**
1445 * @tc.steps: step2. sync deviceB data to A and check data
1446 * * @tc.expected: step2. interface return ok
1447 */
1448 Key key1 = {'1'};
1449 Key key2 = {'2'};
1450 Key key3 = {'3'};
1451 Key key4 = {'4'};
1452 Value value = {'1'};
1453 Timestamp currentTime;
1454 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1455 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1456 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1457 EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1458 EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1459 /**
1460 * @tc.steps: step3. deviceA call pull sync
1461 * @tc.expected: step3. sync should return OK.
1462 */
1463 std::map<std::string, DBStatus> result;
1464 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result) == OK);
1465
1466 /**
1467 * @tc.expected: step4. onComplete should be called, check data
1468 */
1469 ASSERT_TRUE(result.size() == devices.size());
1470 for (const auto &pair : result) {
1471 EXPECT_TRUE(pair.second == OK);
1472 }
1473 Value actualValue;
1474 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1475 EXPECT_EQ(actualValue, value);
1476 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1477 EXPECT_EQ(actualValue, value);
1478 /**
1479 * @tc.steps: step5. device B rebuild and put some data
1480 * * @tc.expected: step5. rebuild ok
1481 */
1482 if (g_deviceB != nullptr) {
1483 delete g_deviceB;
1484 g_deviceB = nullptr;
1485 }
1486 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
1487 ASSERT_TRUE(g_deviceB != nullptr);
1488 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
1489 ASSERT_TRUE(syncInterfaceB != nullptr);
1490 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
1491 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1492 EXPECT_EQ(g_deviceB->PutData(key3, value, currentTime, 0), E_OK);
1493 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1494 EXPECT_EQ(g_deviceB->PutData(key4, value, currentTime, 0), E_OK);
1495 /**
1496 * @tc.steps: step6. sync to device A again and check data
1497 * * @tc.expected: step6. sync ok
1498 */
1499 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1500 EXPECT_EQ(g_kvDelegatePtr->Get(key3, actualValue), OK);
1501 EXPECT_EQ(actualValue, value);
1502 EXPECT_EQ(g_kvDelegatePtr->Get(key4, actualValue), OK);
1503 EXPECT_EQ(actualValue, value);
1504 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1505 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
1506 }
1507
1508 /**
1509 * @tc.name: RebuildSync003
1510 * @tc.desc: test clear history data when receive ack
1511 * @tc.type: FUNC
1512 * @tc.require:
1513 * @tc.author: zhuwentao
1514 */
1515 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync003, TestSize.Level1)
1516 {
1517 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1518 /**
1519 * @tc.steps: step1. sync deviceB data to A and check data
1520 * * @tc.expected: step1. interface return ok
1521 */
1522 Key key1 = {'1'};
1523 Key key2 = {'2'};
1524 Key key3 = {'3'};
1525 Key key4 = {'4'};
1526 Value value = {'1'};
1527 EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1528 EXPECT_EQ(g_deviceB->PutData(key2, value, 2u, 0), E_OK); // 2: timestamp
1529 EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1530 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1531 Value actualValue;
1532 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1533 EXPECT_EQ(actualValue, value);
1534 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1535 EXPECT_EQ(actualValue, value);
1536 VirtualDataItem item;
1537 EXPECT_EQ(g_deviceB->GetData(key3, item), E_OK);
1538 EXPECT_EQ(item.value, value);
1539 /**
1540 * @tc.steps: step2. device B sync to device A,but make it failed
1541 * * @tc.expected: step2. interface return ok
1542 */
1543 EXPECT_EQ(g_deviceB->PutData(key4, value, 3u, 0), E_OK); // 3: timestamp
1544 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_A, DATA_SYNC_MESSAGE);
1545 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1546 /**
1547 * @tc.steps: step3. device B set delay send time
1548 * * @tc.expected: step3. interface return ok
1549 */
1550 std::set<std::string> delayDevice = {DEVICE_B};
1551 g_communicatorAggregator->SetSendDelayInfo(3000u, DATA_SYNC_MESSAGE, 1u, 0u, delayDevice); // delay 3000ms one time
1552 /**
1553 * @tc.steps: step4. device A rebuilt, device B push data to A and set clear remote data mark into context after 1s
1554 * * @tc.expected: step4. interface return ok
1555 */
1556 g_deviceB->SetClearRemoteStaleData(true);
1557 g_mgr.CloseKvStore(g_kvDelegatePtr);
1558 g_kvDelegatePtr = nullptr;
1559 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1560 KvStoreNbDelegate::Option option;
1561 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1562 ASSERT_TRUE(g_kvDelegateStatus == OK);
1563 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1564 std::map<std::string, DBStatus> result;
1565 std::vector<std::string> devices = {g_deviceB->GetDeviceId()};
1566 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1567 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1568 /**
1569 * @tc.steps: step5. device B sync to A, make it clear history data and check data
1570 * * @tc.expected: step5. interface return ok
1571 */
1572 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1573 EXPECT_EQ(g_deviceB->GetData(key3, item), -E_NOT_FOUND);
1574 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1575 EXPECT_EQ(actualValue, value);
1576 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1577 EXPECT_EQ(actualValue, value);
1578 g_communicatorAggregator->ResetSendDelayInfo();
1579 }
1580
1581 /**
1582 * @tc.name: RebuildSync004
1583 * @tc.desc: test WIPE_STALE_DATA mode when peers rebuilt db
1584 * @tc.type: FUNC
1585 * @tc.require:
1586 * @tc.author: zhangtao
1587 */
1588 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync004, TestSize.Level1)
1589 {
1590 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1591 /**
1592 * @tc.steps: step1. sync deviceB data to A and check data
1593 * * @tc.expected: step1. interface return ok
1594 */
1595 Key key1 = {'1'};
1596 Key key2 = {'2'};
1597 Key key3 = {'3'};
1598 Key key4 = {'4'};
1599 Value value = {'1'};
1600 EXPECT_EQ(g_kvDelegatePtr->Put(key1, value), OK);
1601 EXPECT_EQ(g_kvDelegatePtr->Put(key2, value), OK);
1602 EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1603 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1604 Value actualValue;
1605 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1606 EXPECT_EQ(actualValue, value);
1607 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1608 EXPECT_EQ(actualValue, value);
1609 EXPECT_EQ(g_kvDelegatePtr->Get(key3, actualValue), OK);
1610 EXPECT_EQ(actualValue, value);
1611 VirtualDataItem item;
1612 EXPECT_EQ(g_deviceB->GetData(key1, item), E_OK);
1613 EXPECT_EQ(item.value, value);
1614 EXPECT_EQ(g_deviceB->GetData(key2, item), E_OK);
1615 EXPECT_EQ(item.value, value);
1616 EXPECT_EQ(g_deviceB->GetData(key3, item), E_OK);
1617 EXPECT_EQ(item.value, value);
1618
1619 /**
1620 * @tc.steps: step2. device A rebuilt, device B push data to A and set clear remote data mark into context after 1s
1621 * * @tc.expected: step2. interface return ok
1622 */
1623 g_deviceB->SetClearRemoteStaleData(true);
1624 EXPECT_EQ(g_deviceB->PutData(key4, value, 3u, 2), E_OK); // 3: timestamp
1625
1626 VirtualDataItem item2;
1627 EXPECT_EQ(g_deviceB->GetData(key4, item2), E_OK);
1628 EXPECT_EQ(item2.value, value);
1629 g_mgr.CloseKvStore(g_kvDelegatePtr);
1630 g_kvDelegatePtr = nullptr;
1631 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1632 KvStoreNbDelegate::Option option;
1633 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1634 ASSERT_TRUE(g_kvDelegateStatus == OK);
1635 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1636
1637 /**
1638 * @tc.steps: step3. device B sync to A, make it clear history data and check data
1639 * * @tc.expected: step3. interface return ok
1640 */
1641 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1642 EXPECT_EQ(g_deviceB->GetData(key2, item), -E_NOT_FOUND);
1643 EXPECT_EQ(g_deviceB->GetData(key3, item), -E_NOT_FOUND);
1644 EXPECT_EQ(g_deviceB->GetData(key4, item2), E_OK);
1645 EXPECT_EQ(item2.value, value);
1646 EXPECT_EQ(g_kvDelegatePtr->Get(key4, actualValue), OK);
1647 EXPECT_EQ(actualValue, value);
1648 }
1649
1650 /**
1651 * @tc.name: RemoveDeviceData001
1652 * @tc.desc: call rekey and removeDeviceData Concurrently
1653 * @tc.type: FUNC
1654 * @tc.require:
1655 * @tc.author: zhuwentao
1656 */
1657 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData001, TestSize.Level1)
1658 {
1659 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1660 /**
1661 * @tc.steps: step1. sync deviceB data to A
1662 * * @tc.expected: step1. interface return ok
1663 */
1664 Key key1 = {'1'};
1665 Key key2 = {'2'};
1666 Value value = {'1'};
1667 g_deviceB->PutData(key1, value, 1, 0);
1668 g_deviceB->PutData(key2, value, 2, 0);
1669 g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true);
1670
1671 Value actualValue;
1672 g_kvDelegatePtr->Get(key1, actualValue);
1673 EXPECT_EQ(actualValue, value);
1674 actualValue.clear();
1675 g_kvDelegatePtr->Get(key2, actualValue);
1676 EXPECT_EQ(actualValue, value);
1677 /**
1678 * @tc.steps: step2. call Rekey and RemoveDeviceData Concurrently
1679 * * @tc.expected: step2. interface return ok
1680 */
__anon762f6f831302() 1681 std::thread thread1([]() {
1682 CipherPassword passwd3;
1683 std::vector<uint8_t> passwdVect = {'p', 's', 'd', 'z'};
1684 passwd3.SetValue(passwdVect.data(), passwdVect.size());
1685 g_kvDelegatePtr->Rekey(passwd3);
1686 });
__anon762f6f831402() 1687 std::thread thread2([]() {
1688 g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
1689 });
1690 thread1.join();
1691 thread2.join();
1692 }
1693
1694 /**
1695 * @tc.name: DeviceOfflineSyncTask001
1696 * @tc.desc: Test sync task when device offline and close db Concurrently
1697 * @tc.type: FUNC
1698 * @tc.require:
1699 * @tc.author: zhuwentao
1700 */
1701 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask001, TestSize.Level3)
1702 {
1703 DBStatus status = OK;
1704 std::vector<std::string> devices;
1705 devices.push_back(g_deviceB->GetDeviceId());
1706
1707 /**
1708 * @tc.steps: step1. deviceA put {k1, v1}
1709 */
1710 Key key = {'1'};
1711 Value value = {'1'};
1712 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1713
1714 /**
1715 * @tc.steps: step2. deviceA set auto sync and put some key/value
1716 * @tc.expected: step2. interface should return OK.
1717 */
1718 bool autoSync = true;
1719 PragmaData data = static_cast<PragmaData>(&autoSync);
1720 status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1721 ASSERT_EQ(status, OK);
1722
1723 Key key1 = {'2'};
1724 Key key2 = {'3'};
1725 Key key3 = {'4'};
1726 Key key4 = {'5'};
1727 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1728 ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1729 ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1730 ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1731 ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value) == OK);
1732 /**
1733 * @tc.steps: step3. device offline and close db Concurrently
1734 * @tc.expected: step3. interface should return OK.
1735 */
__anon762f6f831502() 1736 std::thread thread1([]() {
1737 g_mgr.CloseKvStore(g_kvDelegatePtr);
1738 g_kvDelegatePtr = nullptr;
1739 });
__anon762f6f831602() 1740 std::thread thread2([]() {
1741 g_deviceB->Offline();
1742 });
1743 thread1.join();
1744 thread2.join();
1745 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1746 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1747 }
1748
1749 /**
1750 * @tc.name: DeviceOfflineSyncTask002
1751 * @tc.desc: Test sync task when autoSync and close db Concurrently
1752 * @tc.type: FUNC
1753 * @tc.require:
1754 * @tc.author: zhuwentao
1755 */
1756 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask002, TestSize.Level3)
1757 {
1758 DBStatus status = OK;
1759 g_deviceC->Offline();
1760
1761 /**
1762 * @tc.steps: step1. deviceA put {k1, v1}
1763 */
1764 Key key = {'1'};
1765 Value value = {'1'};
1766 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1767
1768 /**
1769 * @tc.steps: step2. deviceA set auto sync and put some key/value
1770 * @tc.expected: step2. interface should return OK.
1771 */
1772 bool autoSync = true;
1773 PragmaData data = static_cast<PragmaData>(&autoSync);
1774 status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1775 ASSERT_EQ(status, OK);
1776 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME * 2));
1777
1778 Key key1 = {'2'};
1779 Key key2 = {'3'};
1780 Key key3 = {'4'};
1781 ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1782 ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1783 ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1784 /**
1785 * @tc.steps: step3. close db
1786 * @tc.expected: step3. interface should return OK.
1787 */
1788 g_mgr.CloseKvStore(g_kvDelegatePtr);
1789 g_kvDelegatePtr = nullptr;
1790 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1791 }
1792
1793 /**
1794 * @tc.name: DeviceOfflineSyncTask003
1795 * @tc.desc: Test sync task when device offline after call sync
1796 * @tc.type: FUNC
1797 * @tc.require:
1798 * @tc.author: zhuwentao
1799 */
1800 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask003, TestSize.Level3)
1801 {
1802 std::vector<std::string> devices;
1803 devices.push_back(g_deviceB->GetDeviceId());
1804
1805 /**
1806 * @tc.steps: step1. deviceA put {k1, v1}
1807 */
1808 Key key = {'1'};
1809 Value value = {'1'};
1810 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1811 /**
1812 * @tc.steps: step2. device offline after call sync
1813 * @tc.expected: step2. interface should return OK.
1814 */
1815 Query query = Query::Select().PrefixKey(key);
1816 ASSERT_TRUE(g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, query, false) == OK);
1817 std::this_thread::sleep_for(std::chrono::milliseconds(15)); // wait for 15ms
1818 g_deviceB->Offline();
1819 }
1820
1821 /**
1822 * @tc.name: GetSyncDataFail001
1823 * @tc.desc: test get sync data failed when sync
1824 * @tc.type: FUNC
1825 * @tc.require:
1826 * @tc.author: zhuwentao
1827 */
1828 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail001, TestSize.Level1)
1829 {
1830 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1831 /**
1832 * @tc.steps: step1. device B set get data errCode control and put some data
1833 * * @tc.expected: step1. interface return ok
1834 */
1835 g_deviceB->SetGetDataErrCode(1, -E_BUSY, true);
1836 Key key1 = {'1'};
1837 Value value = {'1'};
1838 EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1839 /**
1840 * @tc.steps: step2. device B sync to device A and check data
1841 * * @tc.expected: step2. interface return ok
1842 */
1843 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1844 Value actualValue;
1845 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1846 g_deviceB->ResetDataControl();
1847 }
1848
1849 /**
1850 * @tc.name: GetSyncDataFail002
1851 * @tc.desc: test get sync data failed when sync with large data
1852 * @tc.type: FUNC
1853 * @tc.require:
1854 * @tc.author: zhuwentao
1855 */
1856 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail002, TestSize.Level1)
1857 {
1858 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1859 /**
1860 * @tc.steps: step1. device B set get data errCode control and put some data
1861 * * @tc.expected: step1. interface return ok
1862 */
1863 g_deviceB->SetGetDataErrCode(2, -E_BUSY, true);
1864 int totalSize = 4000u;
1865 std::vector<Entry> entries;
1866 std::vector<Key> keys;
1867 const int keyLen = 10; // 20 Bytes
1868 const int valueLen = 10; // 20 Bytes
1869 DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1870 uint32_t i = 1u;
1871 for (const auto &entry : entries) {
1872 EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1873 i++;
1874 }
1875 /**
1876 * @tc.steps: step2. device B sync to device A and check data
1877 * * @tc.expected: step2. interface return ok
1878 */
1879 std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
1880 std::map<std::string, DBStatus> result;
1881 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1882 Value actualValue;
1883 for (int j = 1u; j <= totalSize; j++) {
1884 if (j > totalSize / 2) {
1885 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1886 } else {
1887 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1888 }
1889 }
1890 g_deviceB->ResetDataControl();
1891 }
1892
1893 /**
1894 * @tc.name: GetSyncDataFail003
1895 * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1896 * @tc.type: FUNC
1897 * @tc.require:
1898 * @tc.author: zhuwentao
1899 */
1900 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail003, TestSize.Level1)
1901 {
1902 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1903 /**
1904 * @tc.steps: step1. device B set get data errCode control and put some data
1905 * * @tc.expected: step1. interface return ok
1906 */
1907 g_deviceB->SetGetDataErrCode(1, -E_EKEYREVOKED, true);
1908 Key key1 = {'1'};
1909 Key key2 = {'3'};
1910 Value value = {'1'};
1911 EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1912 EXPECT_EQ(g_kvDelegatePtr->Put(key2, value), OK);
1913 /**
1914 * @tc.steps: step2. device B sync to device A and check data
1915 * * @tc.expected: step2. interface return ok
1916 */
1917 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1918 Value actualValue;
1919 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1920 VirtualDataItem item;
1921 EXPECT_EQ(g_deviceB->GetData(key2, item), E_OK);
1922 g_deviceB->ResetDataControl();
1923 }
1924
1925 /**
1926 * @tc.name: GetSyncDataFail004
1927 * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1928 * @tc.type: FUNC
1929 * @tc.require:
1930 * @tc.author: zhuwentao
1931 */
1932 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail004, TestSize.Level1)
1933 {
1934 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1935 /**
1936 * @tc.steps: step1. device B set get data errCode control and put some data
1937 * * @tc.expected: step1. interface return ok
1938 */
1939 g_deviceB->SetGetDataErrCode(2, -E_EKEYREVOKED, true);
1940 int totalSize = 4000u;
1941 std::vector<Entry> entries;
1942 std::vector<Key> keys;
1943 const int keyLen = 10; // 20 Bytes
1944 const int valueLen = 10; // 20 Bytes
1945 DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1946 uint32_t i = 1u;
1947 for (const auto &entry : entries) {
1948 EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1949 i++;
1950 }
1951 Key key = {'a', 'b', 'c'};
1952 Value value = {'1'};
1953 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1954 /**
1955 * @tc.steps: step2. device B sync to device A and check data
1956 * * @tc.expected: step2. interface return ok
1957 */
1958 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1959 std::this_thread::sleep_for(std::chrono::seconds(1));
1960 Value actualValue;
1961 for (int j = 1u; j <= totalSize; j++) {
1962 if (j > totalSize / 2) {
1963 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1964 } else {
1965 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1966 }
1967 }
1968 VirtualDataItem item;
1969 EXPECT_EQ(g_deviceB->GetData(key, item), E_OK);
1970 g_deviceB->ResetDataControl();
1971 }
1972
1973 /**
1974 * @tc.name: InterceptDataFail001
1975 * @tc.desc: test intercept data failed when sync
1976 * @tc.type: FUNC
1977 * @tc.require:
1978 * @tc.author: zhuwentao
1979 */
1980 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptDataFail001, TestSize.Level1)
1981 {
1982 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1983 /**
1984 * @tc.steps: step1. device A set intercept data errCode and put some data
1985 * * @tc.expected: step1. interface return ok
1986 */
1987 g_kvDelegatePtr->SetPushDataInterceptor(
__anon762f6f831702(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 1988 [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
1989 int errCode = OK;
1990 auto entries = data.GetEntries();
1991 LOGD("====here111,size=%d", entries.size());
1992 for (size_t i = 0; i < entries.size(); i++) {
1993 Key newKey;
1994 errCode = data.ModifyKey(i, newKey);
1995 if (errCode != OK) {
1996 break;
1997 }
1998 }
1999 return errCode;
2000 }
2001 );
2002 Key key = {'1'};
2003 Value value = {'1'};
2004 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
2005 /**
2006 * @tc.steps: step2. device A sync to device B and check data
2007 * * @tc.expected: step2. interface return ok
2008 */
2009 std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2010 std::map<std::string, DBStatus> result;
2011 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
2012 ASSERT_TRUE(result.size() == devices.size());
2013 for (const auto &pair : result) {
2014 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2015 EXPECT_TRUE(pair.second == INTERCEPT_DATA_FAIL);
2016 }
2017 VirtualDataItem item;
2018 EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
2019 }
2020
2021 /**
2022 * @tc.name: InterceptDataFail002
2023 * @tc.desc: test intercept data failed when sync
2024 * @tc.type: FUNC
2025 * @tc.require:
2026 * @tc.author: zhangqiquan
2027 */
2028 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptDataFail002, TestSize.Level1)
2029 {
2030 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2031 /**
2032 * @tc.steps: step1. device A set intercept data errCode and B put some data
2033 * @tc.expected: step1. interface return ok
2034 */
2035 g_kvDelegatePtr->SetReceiveDataInterceptor(
__anon762f6f831802(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 2036 [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
2037 auto entries = data.GetEntries();
2038 LOGD("====on receive,size=%d", entries.size());
2039 for (size_t i = 0; i < entries.size(); i++) {
2040 Key newKey;
2041 int errCode = data.ModifyKey(i, newKey);
2042 if (errCode != OK) {
2043 return errCode;
2044 }
2045 }
2046 return E_OK;
2047 }
2048 );
2049 Key key = {'1'};
2050 Value value = {'1'};
2051 g_deviceB->PutData(key, value, 1u, 0); // 1 is timestamp
2052 /**
2053 * @tc.steps: step2. device A sync to device B and check data
2054 * @tc.expected: step2. interface return ok
2055 */
2056 std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2057 std::map<std::string, DBStatus> result;
2058 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
2059 ASSERT_TRUE(result.size() == devices.size());
2060 for (const auto &pair : result) {
2061 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2062 EXPECT_EQ(pair.second, INTERCEPT_DATA_FAIL);
2063 }
2064 Value actualValue;
2065 EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), NOT_FOUND);
2066 }
2067
2068 /**
2069 * @tc.name: InterceptData001
2070 * @tc.desc: test intercept receive data when sync
2071 * @tc.type: FUNC
2072 * @tc.require:
2073 * @tc.author: zhangqiquan
2074 */
2075 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptData001, TestSize.Level0)
2076 {
2077 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2078 /**
2079 * @tc.steps: step1. device A set intercept data errCode and B put some data
2080 * @tc.expected: step1. interface return ok
2081 */
2082 g_kvDelegatePtr->SetReceiveDataInterceptor(
__anon762f6f831902(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 2083 [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
2084 auto entries = data.GetEntries();
2085 LOGD("====on receive,size=%d", entries.size());
2086 for (size_t i = 0; i < entries.size(); i++) {
2087 Key newKey = {'2'};
2088 int errCode = data.ModifyKey(i, newKey);
2089 if (errCode != OK) {
2090 return errCode;
2091 }
2092 Value newValue = {'3'};
2093 errCode = data.ModifyValue(i, newValue);
2094 if (errCode != OK) {
2095 return errCode;
2096 }
2097 }
2098 return E_OK;
2099 }
2100 );
2101 Key key = {'1'};
2102 Value value = {'1'};
2103 g_deviceB->PutData(key, value, 1u, 0); // 1 is timestamp
2104 /**
2105 * @tc.steps: step2. device A sync to device B and check data
2106 * @tc.expected: step2. interface return ok
2107 */
2108 std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2109 std::map<std::string, DBStatus> result;
2110 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
2111 ASSERT_TRUE(result.size() == devices.size());
2112 for (const auto &pair : result) {
2113 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2114 EXPECT_EQ(pair.second, OK);
2115 }
2116 Value actualValue;
2117 EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), NOT_FOUND);
2118 key = {'2'};
2119 EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), OK);
2120 value = {'3'};
2121 EXPECT_EQ(actualValue, value);
2122 }
2123
2124 /**
2125 * @tc.name: UpdateKey001
2126 * @tc.desc: test update key can effect local data and sync data, without delete data
2127 * @tc.type: FUNC
2128 * @tc.require:
2129 * @tc.author: zhangqiquan
2130 */
2131 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, UpdateKey001, TestSize.Level1)
2132 {
2133 /**
2134 * @tc.steps: step1. device A set sync data (k1, v1) local data (k2, v2) (k3, v3) and delete (k4, v4)
2135 * @tc.expected: step1. put data return ok
2136 */
2137 Key k1 = {'k', '1'};
2138 Value v1 = {'v', '1'};
2139 g_deviceB->PutData(k1, v1, 1, 0);
2140 ASSERT_EQ(g_deviceB->Sync(SyncMode::SYNC_MODE_PUSH_ONLY, true), E_OK);
2141 Value actualValue;
2142 EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
2143 EXPECT_EQ(v1, actualValue);
2144 Key k2 = {'k', '2'};
2145 Value v2 = {'v', '2'};
2146 Key k3 = {'k', '3'};
2147 Value v3 = {'v', '3'};
2148 Key k4 = {'k', '4'};
2149 Value v4 = {'v', '4'};
2150 EXPECT_EQ(g_kvDelegatePtr->Put(k2, v2), OK);
2151 EXPECT_EQ(g_kvDelegatePtr->Put(k3, v3), OK);
2152 EXPECT_EQ(g_kvDelegatePtr->Put(k4, v4), OK);
2153 EXPECT_EQ(g_kvDelegatePtr->Delete(k4), OK);
2154 /**
2155 * @tc.steps: step2. device A update key and set
2156 * @tc.expected: step2. put data return ok
2157 */
__anon762f6f831a02(const Key &originKey, Key &newKey) 2158 DBStatus status = g_kvDelegatePtr->UpdateKey([](const Key &originKey, Key &newKey) {
2159 newKey = originKey;
2160 newKey.push_back('0');
2161 });
2162 EXPECT_EQ(status, OK);
2163 k1.push_back('0');
2164 k2.push_back('0');
2165 k3.push_back('0');
2166 EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
2167 EXPECT_EQ(v1, actualValue);
2168 EXPECT_EQ(g_kvDelegatePtr->Get(k2, actualValue), OK);
2169 EXPECT_EQ(v2, actualValue);
2170 EXPECT_EQ(g_kvDelegatePtr->Get(k3, actualValue), OK);
2171 EXPECT_EQ(v3, actualValue);
2172 }
2173
2174 /**
2175 * @tc.name: MetaBusy001
2176 * @tc.desc: test sync normal when update water mark busy
2177 * @tc.type: FUNC
2178 * @tc.require:
2179 * @tc.author: zhangqiquan
2180 */
2181 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, MetaBusy001, TestSize.Level1)
2182 {
2183 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2184 Key key = {'1'};
2185 Value value = {'1'};
2186 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
2187 std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2188 std::map<std::string, DBStatus> result;
2189 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2190 ASSERT_EQ(result.size(), devices.size());
2191 for (const auto &pair : result) {
2192 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2193 EXPECT_TRUE(pair.second == OK);
2194 }
2195 value = {'2'};
2196 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
__anon762f6f831b02() 2197 g_deviceB->SetSaveDataCallback([] () {
2198 RuntimeContext::GetInstance()->ScheduleTask([]() {
2199 g_deviceB->EraseWaterMark("real_device");
2200 });
2201 std::this_thread::sleep_for(std::chrono::seconds(1));
2202 });
2203 EXPECT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2204 EXPECT_EQ(result.size(), devices.size());
2205 for (const auto &pair : result) {
2206 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2207 EXPECT_TRUE(pair.second == OK);
2208 }
2209 g_deviceB->SetSaveDataCallback(nullptr);
2210 RuntimeContext::GetInstance()->StopTaskPool();
2211 }
2212
2213 /**
2214 * @tc.name: TestErrCodePassthrough001
2215 * @tc.desc: Test ErrCode Passthrough when sync comm fail
2216 * @tc.type: FUNC
2217 * @tc.require:
2218 * @tc.author: suyue
2219 */
2220 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, TestErrCodePassthrough001, TestSize.Level1)
2221 {
2222 /**
2223 * @tc.steps: step1. device put data.
2224 * @tc.expected: step1. sync return OK.
2225 */
2226 std::vector<std::string> devices;
2227 devices.push_back(g_deviceB->GetDeviceId());
2228 devices.push_back(g_deviceC->GetDeviceId());
2229 Key key1 = {'1'};
2230 Value value1 = {'1'};
2231 ASSERT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
2232
2233 /**
2234 * @tc.steps: step2. call sync and mock commErrCode is E_BASE(positive number).
2235 * @tc.expected: step2. return COMM_FAILURE.
2236 */
2237 g_communicatorAggregator->MockCommErrCode(E_BASE);
2238 std::map<std::string, DBStatus> result;
2239 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2240 ASSERT_EQ(status, OK);
2241 for (const auto &pair : result) {
2242 LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, E_BASE);
2243 EXPECT_EQ(pair.second, COMM_FAILURE);
2244 }
2245
2246 /**
2247 * @tc.steps: step3. call sync and mock commErrCode is -E_BASE(negative number).
2248 * @tc.expected: step3. return -E_BASE.
2249 */
2250 g_communicatorAggregator->MockCommErrCode(-E_BASE);
2251 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2252 ASSERT_EQ(status, OK);
2253 for (const auto &pair : result) {
2254 LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, COMM_FAILURE);
2255 EXPECT_EQ(pair.second, static_cast<DBStatus>(-E_BASE));
2256 }
2257
2258 /**
2259 * @tc.steps: step4. call sync and mock commErrCode is INT_MAX.
2260 * @tc.expected: step4. return COMM_FAILURE.
2261 */
2262 g_communicatorAggregator->MockCommErrCode(INT_MAX);
2263 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2264 ASSERT_EQ(status, OK);
2265 for (const auto &pair : result) {
2266 LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, INT_MAX);
2267 EXPECT_EQ(pair.second, COMM_FAILURE);
2268 }
2269
2270 /**
2271 * @tc.steps: step5. call sync and mock commErrCode is -INT_MAX.
2272 * @tc.expected: step5. return -INT_MAX.
2273 */
2274 g_communicatorAggregator->MockCommErrCode(-INT_MAX);
2275 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2276 ASSERT_EQ(status, OK);
2277 for (const auto &pair : result) {
2278 LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, -INT_MAX);
2279 EXPECT_EQ(pair.second, -INT_MAX);
2280 }
2281 g_communicatorAggregator->MockCommErrCode(E_OK);
2282 }
2283
2284 /**
2285 * @tc.name: TestErrCodePassthrough002
2286 * @tc.desc: Test ErrCode Passthrough when sync time out and isDirectEnd is false
2287 * @tc.type: FUNC
2288 * @tc.require:
2289 * @tc.author: suyue
2290 */
2291 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, TestErrCodePassthrough002, TestSize.Level3)
2292 {
2293 /**
2294 * @tc.steps: step1. device put data.
2295 * @tc.expected: step1. sync return OK.
2296 */
2297 std::vector<std::string> devices;
2298 devices.push_back(g_deviceB->GetDeviceId());
2299 ASSERT_EQ(g_kvDelegatePtr->Put({'k', '1'}, {'v', '1'}), OK);
2300
2301 /**
2302 * @tc.steps: step2. set messageId invalid and isDirectEnd is false
2303 * @tc.expected: step2. make sure deviceA push data failed due to timeout
2304 */
__anon762f6f831d02(const std::string &target, DistributedDB::Message *msg) 2305 g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
2306 ASSERT_NE(msg, nullptr);
2307 if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
2308 msg->SetMessageId(INVALID_MESSAGE_ID);
2309 }
2310 });
2311 g_communicatorAggregator->MockDirectEndFlag(false);
2312
2313 /**
2314 * @tc.steps: step3. call sync and mock errCode is E_BASE(positive number).
2315 * @tc.expected: step3. return TIME_OUT.
2316 */
2317 std::map<std::string, DBStatus> result;
__anon762f6f831e02(const std::map<std::string, DBStatus> &map) 2318 auto callback = [&result](const std::map<std::string, DBStatus> &map) {
2319 result = map;
2320 };
2321 Query query = Query::Select().PrefixKey({'k', '1'});
2322 g_communicatorAggregator->MockCommErrCode(E_BASE);
2323 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2324 EXPECT_EQ(result.size(), devices.size());
2325 EXPECT_EQ(result[DEVICE_B], TIME_OUT);
2326
2327 /**
2328 * @tc.steps: step4. call sync and mock errCode is -E_BASE(negative number).
2329 * @tc.expected: step4. return -E_BASE.
2330 */
2331 g_communicatorAggregator->MockCommErrCode(-E_BASE);
2332 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2333 EXPECT_EQ(result.size(), devices.size());
2334 EXPECT_EQ(result[DEVICE_B], -E_BASE);
2335
2336 /**
2337 * @tc.steps: step5. call sync and mock errCode is E_OK(0).
2338 * @tc.expected: step5. return TIME_OUT.
2339 */
2340 g_communicatorAggregator->MockCommErrCode(E_OK);
2341 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2342 EXPECT_EQ(result.size(), devices.size());
2343 EXPECT_EQ(result[DEVICE_B], TIME_OUT);
2344
2345 /**
2346 * @tc.steps: step6. call sync and mock errCode is -INT_MAX.
2347 * @tc.expected: step6. return -INT_MAX.
2348 */
2349 g_communicatorAggregator->MockCommErrCode(-INT_MAX);
2350 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2351 EXPECT_EQ(result.size(), devices.size());
2352 EXPECT_EQ(result[DEVICE_B], -INT_MAX);
2353
2354 g_communicatorAggregator->RegOnDispatch(nullptr);
2355 g_communicatorAggregator->MockCommErrCode(E_OK);
2356 g_communicatorAggregator->MockDirectEndFlag(true);
2357 }
2358