1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <condition_variable>
17 #include <gtest/gtest.h>
18 #include <thread>
19
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "mock_sync_task_context.h"
27 #include "platform_specific.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_kv_sync_task_context.h"
30
31 using namespace testing::ext;
32 using namespace DistributedDB;
33 using namespace DistributedDBUnitTest;
34 using namespace std;
35
36 namespace {
37 class TestSingleVerKvSyncTaskContext : public SingleVerKvSyncTaskContext {
38 public:
39 TestSingleVerKvSyncTaskContext() = default;
40 };
41 string g_testDir;
42 const string STORE_ID = "kv_stroe_complex_sync_test";
43 const int WAIT_TIME = 1000;
44 const std::string DEVICE_A = "real_device";
45 const std::string DEVICE_B = "deviceB";
46 const std::string DEVICE_C = "deviceC";
47 const std::string CREATE_SYNC_TABLE_SQL =
48 "CREATE TABLE IF NOT EXISTS sync_data(" \
49 "key BLOB NOT NULL," \
50 "value BLOB," \
51 "timestamp INT NOT NULL," \
52 "flag INT NOT NULL," \
53 "device BLOB," \
54 "ori_device BLOB," \
55 "hash_key BLOB PRIMARY KEY NOT NULL," \
56 "w_timestamp INT," \
57 "modify_time INT," \
58 "create_time INT" \
59 ");";
60
61 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
62 KvStoreConfig g_config;
63 DistributedDBToolsUnitTest g_tool;
64 DBStatus g_kvDelegateStatus = INVALID_ARGS;
65 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
66 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
67 KvVirtualDevice *g_deviceB = nullptr;
68 KvVirtualDevice *g_deviceC = nullptr;
69
70 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
71 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
72 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
73
PullSyncTest()74 void PullSyncTest()
75 {
76 DBStatus status = OK;
77 std::vector<std::string> devices;
78 devices.push_back(g_deviceB->GetDeviceId());
79
80 Key key = {'1'};
81 Key key2 = {'2'};
82 Value value = {'1'};
83 g_deviceB->PutData(key, value, 0, 0);
84 g_deviceB->PutData(key2, value, 1, 0);
85
86 std::map<std::string, DBStatus> result;
87 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
88 ASSERT_TRUE(status == OK);
89
90 ASSERT_TRUE(result.size() == devices.size());
91 for (const auto &pair : result) {
92 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
93 EXPECT_TRUE(pair.second == OK);
94 }
95 Value value3;
96 EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
97 EXPECT_EQ(value3, value);
98 EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
99 EXPECT_EQ(value3, value);
100 }
101
CrudTest()102 void CrudTest()
103 {
104 vector<Entry> entries;
105 int totalSize = 10;
106 for (int i = 0; i < totalSize; i++) {
107 Entry entry;
108 entry.key.push_back(i);
109 entry.value.push_back('2');
110 entries.push_back(entry);
111 }
112 EXPECT_TRUE(g_kvDelegatePtr->PutBatch(entries) == OK);
113 for (const auto &entry : entries) {
114 Value resultvalue;
115 EXPECT_TRUE(g_kvDelegatePtr->Get(entry.key, resultvalue) == OK);
116 EXPECT_TRUE(resultvalue == entry.value);
117 }
118 for (int i = 0; i < totalSize / 2; i++) { // 2: Half of the total
119 g_kvDelegatePtr->Delete(entries[i].key);
120 Value resultvalue;
121 EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == NOT_FOUND);
122 }
123 for (int i = totalSize / 2; i < totalSize; i++) {
124 Value value = entries[i].value;
125 value.push_back('x');
126 EXPECT_TRUE(g_kvDelegatePtr->Put(entries[i].key, value) == OK);
127 Value resultvalue;
128 EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == OK);
129 EXPECT_TRUE(resultvalue == value);
130 }
131 }
132
DataSync005()133 void DataSync005()
134 {
135 ASSERT_NE(g_communicatorAggregator, nullptr);
136 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
137 ASSERT_TRUE(dataSync != nullptr);
138 dataSync->SendSaveDataNotifyPacket(nullptr, 0, 0, 0, TIME_SYNC_MESSAGE);
139 EXPECT_EQ(g_communicatorAggregator->GetOnlineDevices().size(), 3u); // 3 online dev
140 delete dataSync;
141 }
142
DataSync008()143 void DataSync008()
144 {
145 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
146 ASSERT_TRUE(dataSync != nullptr);
147 auto context = new (std::nothrow) MockSyncTaskContext();
148 dataSync->PutDataMsg(nullptr);
149 bool isNeedHandle = false;
150 bool isContinue = false;
151 EXPECT_EQ(dataSync->MoveNextDataMsg(context, isNeedHandle, isContinue), nullptr);
152 EXPECT_EQ(isNeedHandle, false);
153 EXPECT_EQ(isContinue, false);
154 delete dataSync;
155 delete context;
156 }
157
ReSetWaterDogTest001()158 void ReSetWaterDogTest001()
159 {
160 /**
161 * @tc.steps: step1. put 10 key/value
162 * @tc.expected: step1, put return OK.
163 */
164 for (int i = 0; i < 5; i++) { // put 5 key
165 Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 1024); // rand num 1024 for test
166 Value value;
167 DistributedDBToolsUnitTest::GetRandomKeyValue(value, 10 * 50 * 1024u); // 10 * 50 * 1024 = 500k
168 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
169 }
170 /**
171 * @tc.steps: step2. SetDeviceMtuSize
172 * @tc.expected: step2, return OK.
173 */
174 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 50 * 1024u); // 50 * 1024u = 50k
175 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 50 * 1024u); // 50 * 1024u = 50k
176 /**
177 * @tc.steps: step3. deviceA,deviceB sync to each other at same time
178 * @tc.expected: step3. sync should return OK.
179 */
180 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true), E_OK);
181 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
182 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
183 }
184 }
185
186 class DistributedDBSingleVerP2PComplexSyncTest : public testing::Test {
187 public:
188 static void SetUpTestCase(void);
189 static void TearDownTestCase(void);
190 void SetUp();
191 void TearDown();
192 };
193
SetUpTestCase(void)194 void DistributedDBSingleVerP2PComplexSyncTest::SetUpTestCase(void)
195 {
196 /**
197 * @tc.setup: Init datadir and Virtual Communicator.
198 */
199 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
200 g_config.dataDir = g_testDir;
201 g_mgr.SetKvStoreConfig(g_config);
202
203 string dir = g_testDir + "/single_ver";
204 DIR* dirTmp = opendir(dir.c_str());
205 if (dirTmp == nullptr) {
206 OS::MakeDBDirectory(dir);
207 } else {
208 closedir(dirTmp);
209 }
210
211 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
212 ASSERT_TRUE(g_communicatorAggregator != nullptr);
213 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
214 }
215
TearDownTestCase(void)216 void DistributedDBSingleVerP2PComplexSyncTest::TearDownTestCase(void)
217 {
218 /**
219 * @tc.teardown: Release virtual Communicator and clear data dir.
220 */
221 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
222 LOGE("rm test db files error!");
223 }
224 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
225 }
226
SetUp(void)227 void DistributedDBSingleVerP2PComplexSyncTest::SetUp(void)
228 {
229 DistributedDBToolsUnitTest::PrintTestCaseInfo();
230 /**
231 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
232 */
233 KvStoreNbDelegate::Option option;
234 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
235 ASSERT_TRUE(g_kvDelegateStatus == OK);
236 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
237 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
238 ASSERT_TRUE(g_deviceB != nullptr);
239 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
240 ASSERT_TRUE(syncInterfaceB != nullptr);
241 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
242
243 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
244 ASSERT_TRUE(g_deviceC != nullptr);
245 VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
246 ASSERT_TRUE(syncInterfaceC != nullptr);
247 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
248
249 auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
250 const std::string &deviceId, uint8_t flag) -> bool {
251 return true;
252 };
253 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
254 }
255
TearDown(void)256 void DistributedDBSingleVerP2PComplexSyncTest::TearDown(void)
257 {
258 /**
259 * @tc.teardown: Release device A, B, C
260 */
261 if (g_kvDelegatePtr != nullptr) {
262 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
263 g_kvDelegatePtr = nullptr;
264 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
265 LOGD("delete kv store status %d", status);
266 ASSERT_TRUE(status == OK);
267 }
268 if (g_deviceB != nullptr) {
269 delete g_deviceB;
270 g_deviceB = nullptr;
271 }
272 if (g_deviceC != nullptr) {
273 delete g_deviceC;
274 g_deviceC = nullptr;
275 }
276 PermissionCheckCallbackV2 nullCallback;
277 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
278 }
279
280 /**
281 * @tc.name: SaveDataNotify001
282 * @tc.desc: Test SaveDataNotify function, delay < 30s should sync ok, > 36 should timeout
283 * @tc.type: FUNC
284 * @tc.require: AR000D4876
285 * @tc.author: xushaohua
286 */
287 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SaveDataNotify001, TestSize.Level3)
288 {
289 DBStatus status = OK;
290 const int waitFiveSeconds = 5000;
291 const int waitThirtySeconds = 30000;
292 const int waitThirtySixSeconds = 36000;
293 std::vector<std::string> devices;
294 devices.push_back(g_deviceB->GetDeviceId());
295
296 /**
297 * @tc.steps: step1. deviceA put {k1, v1}
298 */
299 Key key = {'1'};
300 Value value = {'1'};
301 status = g_kvDelegatePtr->Put(key, value);
302 ASSERT_TRUE(status == OK);
303
304 /**
305 * @tc.steps: step2. deviceB set sava data dely 5s
306 */
307 g_deviceB->SetSaveDataDelayTime(waitFiveSeconds);
308
309 /**
310 * @tc.steps: step3. deviceA call sync and wait
311 * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
312 */
313 std::map<std::string, DBStatus> result;
314 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
315 ASSERT_TRUE(status == OK);
316 ASSERT_TRUE(result.size() == devices.size());
317 ASSERT_TRUE(result[DEVICE_B] == OK);
318
319 /**
320 * @tc.steps: step4. deviceB set sava data dely 30s and put {k1, v1}
321 */
322 g_deviceB->SetSaveDataDelayTime(waitThirtySeconds);
323 status = g_kvDelegatePtr->Put(key, value);
324 ASSERT_TRUE(status == OK);
325 /**
326 * @tc.steps: step3. deviceA call sync and wait
327 * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
328 */
329 result.clear();
330 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
331 ASSERT_TRUE(status == OK);
332 ASSERT_TRUE(result.size() == devices.size());
333 ASSERT_TRUE(result[DEVICE_B] == OK);
334
335 /**
336 * @tc.steps: step4. deviceB set sava data dely 36s and put {k1, v1}
337 */
338 g_deviceB->SetSaveDataDelayTime(waitThirtySixSeconds);
339 status = g_kvDelegatePtr->Put(key, value);
340 ASSERT_TRUE(status == OK);
341 /**
342 * @tc.steps: step5. deviceA call sync and wait
343 * @tc.expected: step5. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
344 */
345 result.clear();
346 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
347 ASSERT_TRUE(status == OK);
348 ASSERT_TRUE(result.size() == devices.size());
349 ASSERT_TRUE(result[DEVICE_B] == TIME_OUT);
350 }
351
352 /**
353 * @tc.name: SametimeSync001
354 * @tc.desc: Test 2 device sync with each other
355 * @tc.type: FUNC
356 * @tc.require: AR000CCPOM
357 * @tc.author: zhangqiquan
358 */
359 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync001, TestSize.Level3)
360 {
361 DBStatus status = OK;
362 std::vector<std::string> devices;
363 devices.push_back(g_deviceB->GetDeviceId());
364
365 int responseCount = 0;
366 int requestCount = 0;
367 Key key = {'1'};
368 Value value = {'1'};
369 /**
370 * @tc.steps: step1. make sure deviceB send pull firstly and response_pull secondly
371 * @tc.expected: step1. deviceA put data when finish push task. put data should return OK.
372 */
373 g_communicatorAggregator->RegOnDispatch([&responseCount, &requestCount, &key, &value](
__anon61e499a10302( const std::string &target, DistributedDB::Message *msg) 374 const std::string &target, DistributedDB::Message *msg) {
375 if (target != "real_device" || msg->GetMessageId() != DATA_SYNC_MESSAGE) {
376 return;
377 }
378
379 if (msg->GetMessageType() == TYPE_RESPONSE) {
380 responseCount++;
381 if (responseCount == 1) { // 1 is the ack which B response A's push task
382 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), DBStatus::OK);
383 std::this_thread::sleep_for(std::chrono::seconds(1));
384 } else if (responseCount == 2) { // 2 is the ack which B response A's response_pull task
385 msg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
386 }
387 } else if (msg->GetMessageType() == TYPE_REQUEST) {
388 requestCount++;
389 if (requestCount == 1) { // 1 is A push task
390 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2 sec
391 }
392 }
393 });
394 /**
395 * @tc.steps: step2. deviceA,deviceB sync to each other at same time
396 * @tc.expected: step2. sync should return OK.
397 */
398 std::map<std::string, DBStatus> result;
__anon61e499a10402null399 std::thread subThread([]{
400 g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true);
401 });
402 status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_PULL, result);
403 subThread.join();
404 g_communicatorAggregator->RegOnDispatch(nullptr);
405
406 EXPECT_TRUE(status == OK);
407 ASSERT_TRUE(result.size() == devices.size());
408 EXPECT_TRUE(result[DEVICE_B] == OK);
409 Value actualValue;
410 g_kvDelegatePtr->Get(key, actualValue);
411 EXPECT_EQ(actualValue, value);
412 }
413
414 /**
415 * @tc.name: SametimeSync002
416 * @tc.desc: Test 2 device sync with each other with water error
417 * @tc.type: FUNC
418 * @tc.require: AR000CCPOM
419 * @tc.author: zhangqiquan
420 */
421 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync002, TestSize.Level3)
422 {
423 DBStatus status = OK;
424 std::vector<std::string> devices;
425 devices.push_back(g_deviceB->GetDeviceId());
426 g_kvDelegatePtr->Put({'k', '1'}, {'v', '1'});
427 /**
428 * @tc.steps: step1. make sure deviceA push data failed and increase water mark
429 * @tc.expected: step1. deviceA push failed with timeout
430 */
__anon61e499a10502(const std::string &target, DistributedDB::Message *msg) 431 g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
432 ASSERT_NE(msg, nullptr);
433 if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
434 msg->SetMessageId(INVALID_MESSAGE_ID);
435 }
436 });
437 std::map<std::string, DBStatus> result;
__anon61e499a10602(const std::map<std::string, DBStatus> &map) 438 auto callback = [&result](const std::map<std::string, DBStatus> &map) {
439 result = map;
440 };
441 Query query = Query::Select().PrefixKey({'k', '1'});
442 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
443 ASSERT_TRUE(result.size() == devices.size());
444 EXPECT_TRUE(result[DEVICE_B] == TIME_OUT);
445 /**
446 * @tc.steps: step2. A push to B with query2, sleep 1s for waiting step3
447 * @tc.expected: step2. sync should return OK.
448 */
__anon61e499a10702(const std::string &target, DistributedDB::Message *msg) 449 g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
450 ASSERT_NE(msg, nullptr);
451 if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
452 std::this_thread::sleep_for(std::chrono::seconds(1));
453 }
454 });
__anon61e499a10802null455 std::thread subThread([&devices] {
456 std::map<std::string, DBStatus> result;
457 auto callback = [&result](const std::map<std::string, DBStatus> &map) {
458 result = map;
459 };
460 Query query = Query::Select().PrefixKey({'k', '2'});
461 LOGD("Begin PUSH");
462 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
463 ASSERT_TRUE(result.size() == devices.size());
464 EXPECT_TRUE(result[DEVICE_A] == OK);
465 });
466 /**
467 * @tc.steps: step3. B pull to A when A is in push task
468 * @tc.expected: step3. sync should return OP_FINISHED_ALL.
469 */
470 std::this_thread::sleep_for(std::chrono::milliseconds(100));
471 std::map<std::string, int> virtualResult;
472 g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, query,
__anon61e499a10a02(const std::map<std::string, int> &map) 473 [&virtualResult](const std::map<std::string, int> &map) {
474 virtualResult = map;
475 }, true);
476 EXPECT_TRUE(status == OK);
477 ASSERT_EQ(virtualResult.size(), devices.size());
478 EXPECT_EQ(virtualResult[DEVICE_A], SyncOperation::OP_FINISHED_ALL);
479 g_communicatorAggregator->RegOnDispatch(nullptr);
480 subThread.join();
481 }
482
483 /**
484 * @tc.name: DatabaseOnlineCallback001
485 * @tc.desc: check database status notify online callback
486 * @tc.type: FUNC
487 * @tc.require: AR000CQS3S SR000CQE0B
488 * @tc.author: zhuwentao
489 */
490 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOnlineCallback001, TestSize.Level1)
491 {
492 /**
493 * @tc.steps: step1. SetStoreStatusNotifier
494 * @tc.expected: step1. SetStoreStatusNotifier ok
495 */
496 std::string targetDev = "DEVICE_X";
497 bool isCheckOk = false;
498 auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anon61e499a10b02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 499 const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
500 if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
501 onlineStatus == true) {
502 isCheckOk = true;
503 }};
504 g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
505 /**
506 * @tc.steps: step2. trigger device online
507 * @tc.expected: step2. check callback ok
508 */
509 g_communicatorAggregator->OnlineDevice(targetDev);
510 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
511 EXPECT_EQ(isCheckOk, true);
512 StoreStatusNotifier nullCallback;
513 g_mgr.SetStoreStatusNotifier(nullCallback);
514 }
515
516 /**
517 * @tc.name: DatabaseOfflineCallback001
518 * @tc.desc: check database status notify online callback
519 * @tc.type: FUNC
520 * @tc.require: AR000CQS3S SR000CQE0B
521 * @tc.author: zhuwentao
522 */
523 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOfflineCallback001, TestSize.Level1)
524 {
525 /**
526 * @tc.steps: step1. SetStoreStatusNotifier
527 * @tc.expected: step1. SetStoreStatusNotifier ok
528 */
529 std::string targetDev = "DEVICE_X";
530 bool isCheckOk = false;
531 auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anon61e499a10c02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 532 const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
533 if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
534 onlineStatus == false) {
535 isCheckOk = true;
536 }};
537 g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
538 /**
539 * @tc.steps: step2. trigger device offline
540 * @tc.expected: step2. check callback ok
541 */
542 g_communicatorAggregator->OfflineDevice(targetDev);
543 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
544 EXPECT_EQ(isCheckOk, true);
545 StoreStatusNotifier nullCallback;
546 g_mgr.SetStoreStatusNotifier(nullCallback);
547 }
548
549 /**
550 * @tc.name: CloseSync001
551 * @tc.desc: Test 2 delegate close when sync
552 * @tc.type: FUNC
553 * @tc.require: AR000CCPOM
554 * @tc.author: zhangqiquan
555 */
556 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync001, TestSize.Level3)
557 {
558 DBStatus status = OK;
559 std::vector<std::string> devices;
560 devices.push_back(g_deviceB->GetDeviceId());
561
562 /**
563 * @tc.steps: step1. make sure A sync start
564 */
565 bool sleep = false;
__anon61e499a10d02(const std::string &target, DistributedDB::Message *msg) 566 g_communicatorAggregator->RegOnDispatch([&sleep](const std::string &target, DistributedDB::Message *msg) {
567 if (!sleep) {
568 sleep = true;
569 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s for waiting close db
570 }
571 });
572
573 KvStoreNbDelegate* kvDelegatePtrA = nullptr;
574 KvStoreNbDelegate::Option option;
__anon61e499a10e02(DBStatus s, KvStoreNbDelegate *delegate) 575 g_mgr.GetKvStore(STORE_ID, option, [&status, &kvDelegatePtrA](DBStatus s, KvStoreNbDelegate *delegate) {
576 status = s;
577 kvDelegatePtrA = delegate;
578 });
579 EXPECT_EQ(status, OK);
580 EXPECT_NE(kvDelegatePtrA, nullptr);
581
582 Key key = {'k'};
583 Value value = {'v'};
584 kvDelegatePtrA->Put(key, value);
585 std::map<std::string, DBStatus> result;
__anon61e499a10f02(const std::map<std::string, DBStatus>& statusMap) 586 auto callback = [&result](const std::map<std::string, DBStatus>& statusMap) {
587 result = statusMap;
588 };
589 /**
590 * @tc.steps: step2. deviceA sync and then close
591 * @tc.expected: step2. sync should abort and data don't exist in B
592 */
__anon61e499a11002() 593 std::thread closeThread([&kvDelegatePtrA]() {
594 std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for waiting sync start
595 EXPECT_EQ(g_mgr.CloseKvStore(kvDelegatePtrA), OK);
596 });
597 EXPECT_EQ(kvDelegatePtrA->Sync(devices, SYNC_MODE_PUSH_ONLY, callback, false), OK);
598 LOGD("Sync finish");
599 closeThread.join();
600 std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s for waiting sync finish
601 EXPECT_EQ(result.size(), 0u);
602 VirtualDataItem actualValue;
603 EXPECT_EQ(g_deviceB->GetData(key, actualValue), -E_NOT_FOUND);
604 g_communicatorAggregator->RegOnDispatch(nullptr);
605 }
606
607 /**
608 * @tc.name: CloseSync002
609 * @tc.desc: Test 1 delegate close when in time sync
610 * @tc.type: FUNC
611 * @tc.require: AR000CCPOM
612 * @tc.author: zhangqiquan
613 */
614 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync002, TestSize.Level3)
615 {
616 /**
617 * @tc.steps: step1. invalid time sync packet from A
618 */
__anon61e499a11102(const std::string &target, DistributedDB::Message *msg) 619 g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
620 ASSERT_NE(msg, nullptr);
621 if (target == DEVICE_B && msg->GetMessageId() == TIME_SYNC_MESSAGE && msg->GetMessageType() == TYPE_REQUEST) {
622 msg->SetMessageId(INVALID_MESSAGE_ID);
623 LOGD("Message is invalid");
624 }
625 });
626 Timestamp currentTime;
627 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
628 g_deviceB->PutData({'k'}, {'v'}, currentTime, 0);
629
630 /**
631 * @tc.steps: step2. B PUSH to A and A close after 1s
632 * @tc.expected: step2. A closing time cost letter than 4s
633 */
__anon61e499a11202() 634 std::thread closingThread([]() {
635 std::this_thread::sleep_for(std::chrono::seconds(1));
636 LOGD("Begin Close");
637 Timestamp beginTime;
638 (void)OS::GetCurrentSysTimeInMicrosecond(beginTime);
639 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
640 Timestamp endTime;
641 (void)OS::GetCurrentSysTimeInMicrosecond(endTime);
642 EXPECT_LE(static_cast<int>(endTime - beginTime), 4 * 1000 * 1000); // waiting 4 * 1000 * 1000 us
643 LOGD("End Close");
644 });
645 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
646 closingThread.join();
647
648 /**
649 * @tc.steps: step3. remove db
650 * @tc.expected: step3. remove ok
651 */
652 g_kvDelegatePtr = nullptr;
653 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
654 LOGD("delete kv store status %d", status);
655 ASSERT_TRUE(status == OK);
656 g_communicatorAggregator->RegOnDispatch(nullptr);
657 }
658
659 /**
660 * @tc.name: OrderbyWriteTimeSync001
661 * @tc.desc: sync query with order by writeTime
662 * @tc.type: FUNC
663 * @tc.require: AR000H5VLO
664 * @tc.author: zhuwentao
665 */
666 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, OrderbyWriteTimeSync001, TestSize.Level0)
667 {
668 /**
669 * @tc.steps: step1. deviceA subscribe query with order by write time
670 * * @tc.expected: step1. interface return not support
671 */
672 std::vector<std::string> devices;
673 devices.push_back(g_deviceB->GetDeviceId());
674 Query query = Query::Select().PrefixKey({'k'}).OrderByWriteTime(true);
675 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, nullptr, query, true), NOT_SUPPORT);
676 }
677
678
679 /**
680 * @tc.name: Device Offline Sync 001
681 * @tc.desc: Test push sync when device offline
682 * @tc.type: FUNC
683 * @tc.require: AR000CCPOM
684 * @tc.author: xushaohua
685 */
686 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync001, TestSize.Level1)
687 {
688 std::vector<std::string> devices;
689 devices.push_back(g_deviceB->GetDeviceId());
690 devices.push_back(g_deviceC->GetDeviceId());
691
692 /**
693 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2}, {k3 delete}, {k4,v2}
694 */
695 Key key1 = {'1'};
696 Value value1 = {'1'};
697 ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value1) == OK);
698
699 Key key2 = {'2'};
700 Value value2 = {'2'};
701 ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value2) == OK);
702
703 Key key3 = {'3'};
704 Value value3 = {'3'};
705 ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value3) == OK);
706 ASSERT_TRUE(g_kvDelegatePtr->Delete(key3) == OK);
707
708 Key key4 = {'4'};
709 Value value4 = {'4'};
710 ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value4) == OK);
711
712 /**
713 * @tc.steps: step2. deviceB offline
714 */
715 g_deviceB->Offline();
716
717 /**
718 * @tc.steps: step3. deviceA call pull sync
719 * @tc.expected: step3. sync should return OK.
720 */
721 std::map<std::string, DBStatus> result;
722 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
723 ASSERT_TRUE(status == OK);
724
725 /**
726 * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
727 * deviceC has {k1, v1}, {k2, v2}, {k3 delete}, {k4,v4}
728 */
729 for (const auto &pair : result) {
730 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
731 if (pair.first == DEVICE_B) {
732 EXPECT_TRUE(pair.second == COMM_FAILURE);
733 } else {
734 EXPECT_TRUE(pair.second == OK);
735 }
736 }
737 VirtualDataItem item;
738 g_deviceC->GetData(key1, item);
739 EXPECT_TRUE(item.value == value1);
740 item.value.clear();
741 g_deviceC->GetData(key2, item);
742 EXPECT_TRUE(item.value == value2);
743 item.value.clear();
744 Key hashKey;
745 DistributedDBToolsUnitTest::CalcHash(key3, hashKey);
746 EXPECT_TRUE(g_deviceC->GetData(hashKey, item) == -E_NOT_FOUND);
747 item.value.clear();
748 g_deviceC->GetData(key4, item);
749 EXPECT_TRUE(item.value == value4);
750 }
751
752 /**
753 * @tc.name: Device Offline Sync 002
754 * @tc.desc: Test pull sync when device offline
755 * @tc.type: FUNC
756 * @tc.require: AR000CCPOM
757 * @tc.author: xushaohua
758 */
759 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync002, TestSize.Level1)
760 {
761 std::vector<std::string> devices;
762 devices.push_back(g_deviceB->GetDeviceId());
763 devices.push_back(g_deviceC->GetDeviceId());
764
765 /**
766 * @tc.steps: step1. deviceB put {k1, v1}
767 */
768 Key key1 = {'1'};
769 Value value1 = {'1'};
770 g_deviceB->PutData(key1, value1, 0, 0);
771
772 /**
773 * @tc.steps: step2. deviceB offline
774 */
775 g_deviceB->Offline();
776
777 /**
778 * @tc.steps: step3. deviceC put {k2, v2}, {k3, delete}, {k4, v4}
779 */
780 Key key2 = {'2'};
781 Value value2 = {'2'};
782 g_deviceC->PutData(key2, value2, 0, 0);
783
784 Key key3 = {'3'};
785 Value value3 = {'3'};
786 g_deviceC->PutData(key3, value3, 0, 1);
787
788 Key key4 = {'4'};
789 Value value4 = {'4'};
790 g_deviceC->PutData(key4, value4, 0, 0);
791
792 /**
793 * @tc.steps: step2. deviceA call pull sync
794 * @tc.expected: step2. sync should return OK.
795 */
796 std::map<std::string, DBStatus> result;
797 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
798 ASSERT_TRUE(status == OK);
799
800 /**
801 * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
802 * deviceA has {k2, v2}, {k3 delete}, {k4,v4}
803 */
804 for (const auto &pair : result) {
805 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
806 if (pair.first == DEVICE_B) {
807 EXPECT_TRUE(pair.second == COMM_FAILURE);
808 } else {
809 EXPECT_TRUE(pair.second == OK);
810 }
811 }
812
813 Value value5;
814 EXPECT_TRUE(g_kvDelegatePtr->Get(key1, value5) != OK);
815 g_kvDelegatePtr->Get(key2, value5);
816 EXPECT_EQ(value5, value2);
817 EXPECT_TRUE(g_kvDelegatePtr->Get(key3, value5) != OK);
818 g_kvDelegatePtr->Get(key4, value5);
819 EXPECT_EQ(value5, value4);
820 }
821
822 /**
823 * @tc.name: EncryptedAlgoUpgrade001
824 * @tc.desc: Test upgrade encrypted db can sync normally
825 * @tc.type: FUNC
826 * @tc.require: AR000HI2JS
827 * @tc.author: zhuwentao
828 */
829 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, EncryptedAlgoUpgrade001, TestSize.Level3)
830 {
831 /**
832 * @tc.steps: step1. clear db
833 * * @tc.expected: step1. interface return ok
834 */
835 if (g_kvDelegatePtr != nullptr) {
836 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
837 g_kvDelegatePtr = nullptr;
838 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
839 LOGD("delete kv store status %d", status);
840 ASSERT_TRUE(status == OK);
841 }
842
843 CipherPassword passwd;
844 std::vector<uint8_t> passwdVect = {'p', 's', 'd', '1'};
845 passwd.SetValue(passwdVect.data(), passwdVect.size());
846 /**
847 * @tc.steps: step2. open old db by sql
848 * * @tc.expected: step2. interface return ok
849 */
850 std::string identifier = DBCommon::GenerateIdentifierId(STORE_ID, APP_ID, USER_ID);
851 std::string hashDir = DBCommon::TransferHashString(identifier);
852 std::string hexHashDir = DBCommon::TransferStringToHex(hashDir);
853 std::string dbPath = g_testDir + "/" + hexHashDir + "/single_ver";
854 ASSERT_TRUE(DBCommon::CreateDirectory(g_testDir + "/" + hexHashDir) == OK);
855 ASSERT_TRUE(DBCommon::CreateDirectory(dbPath) == OK);
856 std::vector<std::string> dbDir {DBConstant::MAINDB_DIR, DBConstant::METADB_DIR, DBConstant::CACHEDB_DIR};
857 for (const auto &item : dbDir) {
858 ASSERT_TRUE(DBCommon::CreateDirectory(dbPath + "/" + item) == OK);
859 }
860 uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
861 sqlite3 *db;
862 std::string fileUrl = dbPath + "/" + DBConstant::MAINDB_DIR + "/" + DBConstant::SINGLE_VER_DATA_STORE + ".db";
863 ASSERT_TRUE(sqlite3_open_v2(fileUrl.c_str(), &db, flag, nullptr) == SQLITE_OK);
864 SQLiteUtils::SetKeyInner(db, CipherType::AES_256_GCM, passwd, DBConstant::DEFAULT_ITER_TIMES);
865 /**
866 * @tc.steps: step3. create table and close
867 * * @tc.expected: step3. interface return ok
868 */
869 ASSERT_TRUE(SQLiteUtils::ExecuteRawSQL(db, CREATE_SYNC_TABLE_SQL) == E_OK);
870 sqlite3_close_v2(db);
871 db = nullptr;
872 LOGI("create old db success");
873 /**
874 * @tc.steps: step4. get kvstore
875 * * @tc.expected: step4. interface return ok
876 */
877 KvStoreNbDelegate::Option option;
878 option.isEncryptedDb = true;
879 option.cipher = CipherType::AES_256_GCM;
880 option.passwd = passwd;
881 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
882 ASSERT_TRUE(g_kvDelegateStatus == OK);
883 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
884 /**
885 * @tc.steps: step5. sync ok
886 * * @tc.expected: step5. interface return ok
887 */
888 PullSyncTest();
889 /**
890 * @tc.steps: step5. crud ok
891 * * @tc.expected: step5. interface return ok
892 */
893 CrudTest();
894 }
895
896 /**
897 * @tc.name: RemoveDeviceData002
898 * @tc.desc: test remove device data before sync
899 * @tc.type: FUNC
900 * @tc.require:
901 * @tc.author: zhuwentao
902 */
903 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData002, TestSize.Level1)
904 {
905 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
906 /**
907 * @tc.steps: step1. sync deviceB data to A and check data
908 * * @tc.expected: step1. interface return ok
909 */
910 Key key1 = {'1'};
911 Key key2 = {'2'};
912 Value value = {'1'};
913 Timestamp currentTime;
914 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
915 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
916 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
917 EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
918 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
919 Value actualValue;
920 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
921 EXPECT_EQ(actualValue, value);
922 actualValue.clear();
923 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
924 EXPECT_EQ(actualValue, value);
925 /**
926 * @tc.steps: step2. call RemoveDeviceData
927 * * @tc.expected: step2. interface return ok
928 */
929 g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
930 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
931 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
932 /**
933 * @tc.steps: step3. sync to device A again and check data
934 * * @tc.expected: step3. sync ok
935 */
936 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
937 actualValue.clear();
938 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
939 EXPECT_EQ(actualValue, value);
940 actualValue.clear();
941 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
942 EXPECT_EQ(actualValue, value);
943 }
944
945 /**
946 * @tc.name: DataSync001
947 * @tc.desc: Test Data Sync when Initialize
948 * @tc.type: FUNC
949 * @tc.require: AR000HI2JS
950 * @tc.author: zhuwentao
951 */
952 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync001, TestSize.Level1)
953 {
954 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
955 ASSERT_TRUE(dataSync != nullptr);
956 std::shared_ptr<Metadata> inMetadata = nullptr;
957 std::string deviceId;
958 Message message;
959 VirtualSingleVerSyncDBInterface tmpInterface;
960 VirtualCommunicator tmpCommunicator(deviceId, g_communicatorAggregator);
961 EXPECT_EQ(dataSync->Initialize(nullptr, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
962 EXPECT_EQ(dataSync->Initialize(&tmpInterface, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
963 EXPECT_EQ(dataSync->Initialize(&tmpInterface, &tmpCommunicator, inMetadata, deviceId), -E_INVALID_ARGS);
964 delete dataSync;
965 }
966
967 /**
968 * @tc.name: DataSync002
969 * @tc.desc: Test active sync with invalid param in DataSync Class
970 * @tc.type: FUNC
971 * @tc.require: AR000HI2JS
972 * @tc.author: zhuwentao
973 */
974 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync002, TestSize.Level1)
975 {
976 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
977 ASSERT_TRUE(dataSync != nullptr);
978 Message message;
979 EXPECT_EQ(dataSync->TryContinueSync(nullptr, &message), -E_INVALID_ARGS);
980 EXPECT_EQ(dataSync->TryContinueSync(nullptr, nullptr), -E_INVALID_ARGS);
981 EXPECT_EQ(dataSync->PushStart(nullptr), -E_INVALID_ARGS);
982 EXPECT_EQ(dataSync->PushPullStart(nullptr), -E_INVALID_ARGS);
983 EXPECT_EQ(dataSync->PullRequestStart(nullptr), -E_INVALID_ARGS);
984 EXPECT_EQ(dataSync->PullResponseStart(nullptr), -E_INVALID_ARGS);
985 delete dataSync;
986 }
987
988 /**
989 * @tc.name: DataSync003
990 * @tc.desc: Test receive invalid request data packet in DataSync Class
991 * @tc.type: FUNC
992 * @tc.require: AR000HI2JS
993 * @tc.author: zhuwentao
994 */
995 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync003, TestSize.Level1)
996 {
997 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
998 ASSERT_TRUE(dataSync != nullptr);
999 uint64_t tmpMark = 0;
1000 Message message;
1001 EXPECT_EQ(dataSync->DataRequestRecv(nullptr, nullptr, tmpMark), -E_INVALID_ARGS);
1002 EXPECT_EQ(dataSync->DataRequestRecv(nullptr, &message, tmpMark), -E_INVALID_ARGS);
1003 delete dataSync;
1004 }
1005
1006 /**
1007 * @tc.name: DataSync004
1008 * @tc.desc: Test receive invalid ack packet in DataSync Class
1009 * @tc.type: FUNC
1010 * @tc.require: AR000HI2JS
1011 * @tc.author: zhuwentao
1012 */
1013 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync004, TestSize.Level1)
1014 {
1015 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1016 ASSERT_TRUE(dataSync != nullptr);
1017 Message message;
1018 TestSingleVerKvSyncTaskContext tmpContext;
1019 EXPECT_EQ(dataSync->AckPacketIdCheck(nullptr), false);
1020 EXPECT_EQ(dataSync->AckPacketIdCheck(&message), false);
1021 EXPECT_EQ(dataSync->AckRecv(&tmpContext, nullptr), -E_INVALID_ARGS);
1022 EXPECT_EQ(dataSync->AckRecv(nullptr, nullptr), -E_INVALID_ARGS);
1023 EXPECT_EQ(dataSync->AckRecv(nullptr, &message), -E_INVALID_ARGS);
1024 delete dataSync;
1025 }
1026
1027 /**
1028 * @tc.name: DataSync005
1029 * @tc.desc: Test receive invalid notify packet in DataSync Class
1030 * @tc.type: FUNC
1031 * @tc.require: AR000HI2JS
1032 * @tc.author: zhuwentao
1033 */
1034 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync005, TestSize.Level1)
1035 {
1036 ASSERT_NO_FATAL_FAILURE(DataSync005());
1037 }
1038
1039 /**
1040 * @tc.name: DataSync006
1041 * @tc.desc: Test control start with invalid param in DataSync Class
1042 * @tc.type: FUNC
1043 * @tc.require: AR000HI2JS
1044 * @tc.author: zhuwentao
1045 */
1046 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync006, TestSize.Level1)
1047 {
1048 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1049 ASSERT_TRUE(dataSync != nullptr);
1050 TestSingleVerKvSyncTaskContext tmpContext;
1051 EXPECT_EQ(dataSync->ControlCmdStart(nullptr), -E_INVALID_ARGS);
1052 EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1053 std::shared_ptr<SubscribeManager> subManager = std::make_shared<SubscribeManager>();
1054 tmpContext.SetSubscribeManager(subManager);
1055 tmpContext.SetMode(SyncModeType::INVALID_MODE);
1056 EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1057 std::set<Key> Keys = {{'a'}, {'b'}};
1058 Query query = Query::Select().InKeys(Keys);
1059 QuerySyncObject innerQuery(query);
1060 tmpContext.SetQuery(innerQuery);
1061 tmpContext.SetMode(SyncModeType::SUBSCRIBE_QUERY);
1062 EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_NOT_SUPPORT);
1063 delete dataSync;
1064 subManager = nullptr;
1065 }
1066
1067 /**
1068 * @tc.name: DataSync007
1069 * @tc.desc: Test receive invalid control packet in DataSync Class
1070 * @tc.type: FUNC
1071 * @tc.require: AR000HI2JS
1072 * @tc.author: zhuwentao
1073 */
1074 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync007, TestSize.Level1)
1075 {
1076 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1077 ASSERT_TRUE(dataSync != nullptr);
1078 Message message;
1079 ControlRequestPacket packet;
1080 TestSingleVerKvSyncTaskContext tmpContext;
1081 EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1082 message.SetCopiedObject(packet);
1083 EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1084 delete dataSync;
1085 }
1086
1087 /**
1088 * @tc.name: DataSync008
1089 * @tc.desc: Test pull null msg in dataQueue in DataSync Class
1090 * @tc.type: FUNC
1091 * @tc.require: AR000HI2JS
1092 * @tc.author: zhuwentao
1093 */
1094 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync008, TestSize.Level1)
1095 {
1096 ASSERT_NO_FATAL_FAILURE(DataSync008());
1097 }
1098
1099 /**
1100 * @tc.name: SyncRetry001
1101 * @tc.desc: use sync retry sync use push
1102 * @tc.type: FUNC
1103 * @tc.require: AR000CKRTD AR000CQE0E
1104 * @tc.author: zhuwentao
1105 */
1106 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry001, TestSize.Level3)
1107 {
1108 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1109 std::vector<std::string> devices;
1110 devices.push_back(g_deviceB->GetDeviceId());
1111
1112 /**
1113 * @tc.steps: step1. set sync retry
1114 * @tc.expected: step1, Pragma return OK.
1115 */
1116 int pragmaData = 1;
1117 PragmaData input = static_cast<PragmaData>(&pragmaData);
1118 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1119
1120 /**
1121 * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1122 */
1123 ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1124
1125 /**
1126 * @tc.steps: step3. deviceA call sync and wait
1127 * @tc.expected: step3. sync should return OK.
1128 */
1129 std::map<std::string, DBStatus> result;
1130 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1131
1132 /**
1133 * @tc.expected: step4. onComplete should be called, and status is time_out
1134 */
1135 ASSERT_TRUE(result.size() == devices.size());
1136 for (const auto &pair : result) {
1137 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1138 EXPECT_TRUE(pair.second == OK);
1139 }
1140 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1141 }
1142
1143 /**
1144 * @tc.name: SyncRetry002
1145 * @tc.desc: use sync retry sync use pull
1146 * @tc.type: FUNC
1147 * @tc.require: AR000CKRTD AR000CQE0E
1148 * @tc.author: zhuwentao
1149 */
1150 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry002, TestSize.Level3)
1151 {
1152 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE, 4u);
1153 std::vector<std::string> devices;
1154 devices.push_back(g_deviceB->GetDeviceId());
1155
1156 /**
1157 * @tc.steps: step1. set sync retry
1158 * @tc.expected: step1, Pragma return OK.
1159 */
1160 int pragmaData = 1;
1161 PragmaData input = static_cast<PragmaData>(&pragmaData);
1162 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1163
1164 /**
1165 * @tc.steps: step2. deviceA call sync and wait
1166 * @tc.expected: step2. sync should return OK.
1167 */
1168 std::map<std::string, DBStatus> result;
1169 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1170
1171 /**
1172 * @tc.expected: step3. onComplete should be called, and status is time_out
1173 */
1174 ASSERT_TRUE(result.size() == devices.size());
1175 for (const auto &pair : result) {
1176 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1177 EXPECT_TRUE(pair.second == TIME_OUT);
1178 }
1179 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1180 }
1181
1182 /**
1183 * @tc.name: SyncRetry003
1184 * @tc.desc: use sync retry sync use push by compress
1185 * @tc.type: FUNC
1186 * @tc.require: AR000CKRTD AR000CQE0E
1187 * @tc.author: zhuwentao
1188 */
1189 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry003, TestSize.Level3)
1190 {
1191 if (g_kvDelegatePtr != nullptr) {
1192 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1193 g_kvDelegatePtr = nullptr;
1194 }
1195 /**
1196 * @tc.steps: step1. open db use Compress
1197 * @tc.expected: step1, Pragma return OK.
1198 */
1199 KvStoreNbDelegate::Option option;
1200 option.isNeedCompressOnSync = true;
1201 option.compressionRate = 70;
1202 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1203 ASSERT_TRUE(g_kvDelegateStatus == OK);
1204 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1205
1206 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1207 std::vector<std::string> devices;
1208 devices.push_back(g_deviceB->GetDeviceId());
1209
1210 /**
1211 * @tc.steps: step2. set sync retry
1212 * @tc.expected: step2, Pragma return OK.
1213 */
1214 int pragmaData = 1;
1215 PragmaData input = static_cast<PragmaData>(&pragmaData);
1216 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1217
1218 /**
1219 * @tc.steps: step3. deviceA put {k1, v1}, {k2, v2}
1220 */
1221 ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1222
1223 /**
1224 * @tc.steps: step4. deviceA call sync and wait
1225 * @tc.expected: step4. sync should return OK.
1226 */
1227 std::map<std::string, DBStatus> result;
1228 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1229
1230 /**
1231 * @tc.expected: step5. onComplete should be called, and status is time_out
1232 */
1233 ASSERT_TRUE(result.size() == devices.size());
1234 for (const auto &pair : result) {
1235 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1236 EXPECT_TRUE(pair.second == OK);
1237 }
1238 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1239 }
1240
1241 /**
1242 * @tc.name: SyncRetry004
1243 * @tc.desc: use query sync retry sync use push
1244 * @tc.type: FUNC
1245 * @tc.require: AR000CKRTD AR000CQE0E
1246 * @tc.author: zhuwentao
1247 */
1248 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry004, TestSize.Level3)
1249 {
1250 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1251 std::vector<std::string> devices;
1252 devices.push_back(g_deviceB->GetDeviceId());
1253
1254 /**
1255 * @tc.steps: step1. set sync retry
1256 * @tc.expected: step1, Pragma return OK.
1257 */
1258 int pragmaData = 1;
1259 PragmaData input = static_cast<PragmaData>(&pragmaData);
1260 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1261
1262 /**
1263 * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1264 */
1265 for (int i = 0; i < 5; i++) {
1266 Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 128); // rand num 1024 for test
1267 Value value;
1268 DistributedDBToolsUnitTest::GetRandomKeyValue(value, 256u);
1269 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1270 }
1271
1272 /**
1273 * @tc.steps: step3. deviceA call sync and wait
1274 * @tc.expected: step3. sync should return OK.
1275 */
1276 std::map<std::string, DBStatus> result;
1277 std::vector<uint8_t> prefixKey({'a', 'b'});
1278 Query query = Query::Select().PrefixKey(prefixKey);
1279 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
1280
1281 /**
1282 * @tc.expected: step4. onComplete should be called, and status is time_out
1283 */
1284 ASSERT_TRUE(result.size() == devices.size());
1285 for (const auto &pair : result) {
1286 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1287 EXPECT_TRUE(pair.second == OK);
1288 }
1289 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1290 }
1291
1292 /**
1293 * @tc.name: SyncRetry005
1294 * @tc.desc: use sync retry sync use pull by compress
1295 * @tc.type: FUNC
1296 * @tc.require: AR000CKRTD AR000CQE0E
1297 * @tc.author: zhangqiquan
1298 */
1299 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry005, TestSize.Level3)
1300 {
1301 if (g_kvDelegatePtr != nullptr) {
1302 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1303 g_kvDelegatePtr = nullptr;
1304 }
1305 /**
1306 * @tc.steps: step1. open db use Compress
1307 * @tc.expected: step1, Pragma return OK.
1308 */
1309 KvStoreNbDelegate::Option option;
1310 option.isNeedCompressOnSync = true;
1311 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1312 ASSERT_TRUE(g_kvDelegateStatus == OK);
1313 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1314
1315 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1316 std::vector<std::string> devices;
1317 devices.push_back(g_deviceB->GetDeviceId());
1318
1319 /**
1320 * @tc.steps: step2. set sync retry
1321 * @tc.expected: step2, Pragma return OK.
1322 */
1323 int pragmaData = 1;
1324 PragmaData input = static_cast<PragmaData>(&pragmaData);
1325 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1326
1327 /**
1328 * @tc.steps: step3. deviceA call sync and wait
1329 * @tc.expected: step3. sync should return OK.
1330 */
1331 std::map<std::string, DBStatus> result;
1332 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1333
1334 /**
1335 * @tc.expected: step4. onComplete should be called, and status is time_out
1336 */
1337 ASSERT_TRUE(result.size() == devices.size());
1338 for (const auto &pair : result) {
1339 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1340 EXPECT_EQ(pair.second, OK);
1341 }
1342 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1343 }
1344
1345 /**
1346 * @tc.name: ReSetWatchDogTest001
1347 * @tc.desc: trigger resetWatchDog while pull
1348 * @tc.type: FUNC
1349 * @tc.require: AR000CKRTD AR000CQE0E
1350 * @tc.author: zhuwentao
1351 */
1352 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, ReSetWaterDogTest001, TestSize.Level3)
1353 {
1354 ASSERT_NO_FATAL_FAILURE(ReSetWaterDogTest001());
1355 }
1356
1357 /**
1358 * @tc.name: RebuildSync001
1359 * @tc.desc: rebuild db and sync again
1360 * @tc.type: FUNC
1361 * @tc.require:
1362 * @tc.author: zhuwentao
1363 */
1364 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync001, TestSize.Level3)
1365 {
1366 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1367 /**
1368 * @tc.steps: step1. sync deviceB data to A and check data
1369 * * @tc.expected: step1. interface return ok
1370 */
1371 Key key1 = {'1'};
1372 Key key2 = {'2'};
1373 Value value = {'1'};
1374 Timestamp currentTime;
1375 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1376 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1377 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1378 EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1379 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1380
1381 Value actualValue;
1382 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1383 EXPECT_EQ(actualValue, value);
1384 actualValue.clear();
1385 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1386 EXPECT_EQ(actualValue, value);
1387 /**
1388 * @tc.steps: step2. delete db and rebuild
1389 * * @tc.expected: step2. interface return ok
1390 */
1391 g_mgr.CloseKvStore(g_kvDelegatePtr);
1392 g_kvDelegatePtr = nullptr;
1393 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1394 KvStoreNbDelegate::Option option;
1395 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1396 ASSERT_TRUE(g_kvDelegateStatus == OK);
1397 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1398 /**
1399 * @tc.steps: step3. sync to device A again
1400 * * @tc.expected: step3. sync ok
1401 */
1402 value = {'2'};
1403 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1404 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1405 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1406 /**
1407 * @tc.steps: step4. check data in device A
1408 * * @tc.expected: step4. check ok
1409 */
1410 actualValue.clear();
1411 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1412 EXPECT_EQ(actualValue, value);
1413 }
1414
1415 /**
1416 * @tc.name: RebuildSync002
1417 * @tc.desc: test clear remote data when receive data
1418 * @tc.type: FUNC
1419 * @tc.require:
1420 * @tc.author: zhuwentao
1421 */
1422 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync002, TestSize.Level1)
1423 {
1424 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1425 std::vector<std::string> devices;
1426 devices.push_back(g_deviceB->GetDeviceId());
1427 /**
1428 * @tc.steps: step1. device A SET_WIPE_POLICY
1429 * * @tc.expected: step1. interface return ok
1430 */
1431 int pragmaData = 2; // 2 means enable
1432 PragmaData input = static_cast<PragmaData>(&pragmaData);
1433 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_WIPE_POLICY, input) == OK);
1434 /**
1435 * @tc.steps: step2. sync deviceB data to A and check data
1436 * * @tc.expected: step2. interface return ok
1437 */
1438 Key key1 = {'1'};
1439 Key key2 = {'2'};
1440 Key key3 = {'3'};
1441 Key key4 = {'4'};
1442 Value value = {'1'};
1443 Timestamp currentTime;
1444 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1445 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1446 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1447 EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1448 EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1449 /**
1450 * @tc.steps: step3. deviceA call pull sync
1451 * @tc.expected: step3. sync should return OK.
1452 */
1453 std::map<std::string, DBStatus> result;
1454 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result) == OK);
1455
1456 /**
1457 * @tc.expected: step4. onComplete should be called, check data
1458 */
1459 ASSERT_TRUE(result.size() == devices.size());
1460 for (const auto &pair : result) {
1461 EXPECT_TRUE(pair.second == OK);
1462 }
1463 Value actualValue;
1464 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1465 EXPECT_EQ(actualValue, value);
1466 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1467 EXPECT_EQ(actualValue, value);
1468 /**
1469 * @tc.steps: step5. device B rebuild and put some data
1470 * * @tc.expected: step5. rebuild ok
1471 */
1472 if (g_deviceB != nullptr) {
1473 delete g_deviceB;
1474 g_deviceB = nullptr;
1475 }
1476 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
1477 ASSERT_TRUE(g_deviceB != nullptr);
1478 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
1479 ASSERT_TRUE(syncInterfaceB != nullptr);
1480 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
1481 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1482 EXPECT_EQ(g_deviceB->PutData(key3, value, currentTime, 0), E_OK);
1483 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1484 EXPECT_EQ(g_deviceB->PutData(key4, value, currentTime, 0), E_OK);
1485 /**
1486 * @tc.steps: step6. sync to device A again and check data
1487 * * @tc.expected: step6. sync ok
1488 */
1489 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1490 EXPECT_EQ(g_kvDelegatePtr->Get(key3, actualValue), OK);
1491 EXPECT_EQ(actualValue, value);
1492 EXPECT_EQ(g_kvDelegatePtr->Get(key4, actualValue), OK);
1493 EXPECT_EQ(actualValue, value);
1494 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1495 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
1496 }
1497
1498 /**
1499 * @tc.name: RebuildSync003
1500 * @tc.desc: test clear history data when receive ack
1501 * @tc.type: FUNC
1502 * @tc.require:
1503 * @tc.author: zhuwentao
1504 */
1505 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync003, TestSize.Level1)
1506 {
1507 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1508 /**
1509 * @tc.steps: step1. sync deviceB data to A and check data
1510 * * @tc.expected: step1. interface return ok
1511 */
1512 Key key1 = {'1'};
1513 Key key2 = {'2'};
1514 Key key3 = {'3'};
1515 Key key4 = {'4'};
1516 Value value = {'1'};
1517 EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1518 EXPECT_EQ(g_deviceB->PutData(key2, value, 2u, 0), E_OK); // 2: timestamp
1519 EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1520 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1521 Value actualValue;
1522 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1523 EXPECT_EQ(actualValue, value);
1524 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1525 EXPECT_EQ(actualValue, value);
1526 VirtualDataItem item;
1527 EXPECT_EQ(g_deviceB->GetData(key3, item), E_OK);
1528 EXPECT_EQ(item.value, value);
1529 /**
1530 * @tc.steps: step2. device B sync to device A,but make it failed
1531 * * @tc.expected: step2. interface return ok
1532 */
1533 EXPECT_EQ(g_deviceB->PutData(key4, value, 3u, 0), E_OK); // 3: timestamp
1534 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_A, DATA_SYNC_MESSAGE);
1535 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1536 /**
1537 * @tc.steps: step3. device B set delay send time
1538 * * @tc.expected: step3. interface return ok
1539 */
1540 std::set<std::string> delayDevice = {DEVICE_B};
1541 g_communicatorAggregator->SetSendDelayInfo(3000u, DATA_SYNC_MESSAGE, 1u, 0u, delayDevice); // delay 3000ms one time
1542 /**
1543 * @tc.steps: step4. device A rebuilt, device B push data to A and set clear remote data mark into context after 1s
1544 * * @tc.expected: step4. interface return ok
1545 */
1546 g_deviceB->SetClearRemoteStaleData(true);
1547 g_mgr.CloseKvStore(g_kvDelegatePtr);
1548 g_kvDelegatePtr = nullptr;
1549 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1550 KvStoreNbDelegate::Option option;
1551 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1552 ASSERT_TRUE(g_kvDelegateStatus == OK);
1553 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1554 std::map<std::string, DBStatus> result;
1555 std::vector<std::string> devices = {g_deviceB->GetDeviceId()};
1556 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1557 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1558 /**
1559 * @tc.steps: step5. device B sync to A, make it clear history data and check data
1560 * * @tc.expected: step5. interface return ok
1561 */
1562 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1563 EXPECT_EQ(g_deviceB->GetData(key3, item), -E_NOT_FOUND);
1564 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1565 EXPECT_EQ(actualValue, value);
1566 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1567 EXPECT_EQ(actualValue, value);
1568 g_communicatorAggregator->ResetSendDelayInfo();
1569 }
1570
1571 /**
1572 * @tc.name: RemoveDeviceData001
1573 * @tc.desc: call rekey and removeDeviceData Concurrently
1574 * @tc.type: FUNC
1575 * @tc.require: AR000D487B
1576 * @tc.author: zhuwentao
1577 */
1578 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData001, TestSize.Level1)
1579 {
1580 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1581 /**
1582 * @tc.steps: step1. sync deviceB data to A
1583 * * @tc.expected: step1. interface return ok
1584 */
1585 Key key1 = {'1'};
1586 Key key2 = {'2'};
1587 Value value = {'1'};
1588 g_deviceB->PutData(key1, value, 1, 0);
1589 g_deviceB->PutData(key2, value, 2, 0);
1590 g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true);
1591
1592 Value actualValue;
1593 g_kvDelegatePtr->Get(key1, actualValue);
1594 EXPECT_EQ(actualValue, value);
1595 actualValue.clear();
1596 g_kvDelegatePtr->Get(key2, actualValue);
1597 EXPECT_EQ(actualValue, value);
1598 /**
1599 * @tc.steps: step2. call Rekey and RemoveDeviceData Concurrently
1600 * * @tc.expected: step2. interface return ok
1601 */
__anon61e499a11302() 1602 std::thread thread1([]() {
1603 CipherPassword passwd3;
1604 std::vector<uint8_t> passwdVect = {'p', 's', 'd', 'z'};
1605 passwd3.SetValue(passwdVect.data(), passwdVect.size());
1606 g_kvDelegatePtr->Rekey(passwd3);
1607 });
__anon61e499a11402() 1608 std::thread thread2([]() {
1609 g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
1610 });
1611 thread1.join();
1612 thread2.join();
1613 }
1614
1615 /**
1616 * @tc.name: DeviceOfflineSyncTask001
1617 * @tc.desc: Test sync task when device offline and close db Concurrently
1618 * @tc.type: FUNC
1619 * @tc.require: AR000HI2JS
1620 * @tc.author: zhuwentao
1621 */
1622 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask001, TestSize.Level3)
1623 {
1624 DBStatus status = OK;
1625 std::vector<std::string> devices;
1626 devices.push_back(g_deviceB->GetDeviceId());
1627
1628 /**
1629 * @tc.steps: step1. deviceA put {k1, v1}
1630 */
1631 Key key = {'1'};
1632 Value value = {'1'};
1633 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1634
1635 /**
1636 * @tc.steps: step2. deviceA set auto sync and put some key/value
1637 * @tc.expected: step2. interface should return OK.
1638 */
1639 bool autoSync = true;
1640 PragmaData data = static_cast<PragmaData>(&autoSync);
1641 status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1642 ASSERT_EQ(status, OK);
1643
1644 Key key1 = {'2'};
1645 Key key2 = {'3'};
1646 Key key3 = {'4'};
1647 Key key4 = {'5'};
1648 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1649 ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1650 ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1651 ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1652 ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value) == OK);
1653 /**
1654 * @tc.steps: step3. device offline and close db Concurrently
1655 * @tc.expected: step3. interface should return OK.
1656 */
__anon61e499a11502() 1657 std::thread thread1([]() {
1658 g_mgr.CloseKvStore(g_kvDelegatePtr);
1659 g_kvDelegatePtr = nullptr;
1660 });
__anon61e499a11602() 1661 std::thread thread2([]() {
1662 g_deviceB->Offline();
1663 });
1664 thread1.join();
1665 thread2.join();
1666 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1667 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1668 }
1669
1670 /**
1671 * @tc.name: DeviceOfflineSyncTask002
1672 * @tc.desc: Test sync task when autoSync and close db Concurrently
1673 * @tc.type: FUNC
1674 * @tc.require:
1675 * @tc.author: zhuwentao
1676 */
1677 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask002, TestSize.Level3)
1678 {
1679 DBStatus status = OK;
1680 g_deviceC->Offline();
1681
1682 /**
1683 * @tc.steps: step1. deviceA put {k1, v1}
1684 */
1685 Key key = {'1'};
1686 Value value = {'1'};
1687 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1688
1689 /**
1690 * @tc.steps: step2. deviceA set auto sync and put some key/value
1691 * @tc.expected: step2. interface should return OK.
1692 */
1693 bool autoSync = true;
1694 PragmaData data = static_cast<PragmaData>(&autoSync);
1695 status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1696 ASSERT_EQ(status, OK);
1697 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME * 2));
1698
1699 Key key1 = {'2'};
1700 Key key2 = {'3'};
1701 Key key3 = {'4'};
1702 ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1703 ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1704 ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1705 /**
1706 * @tc.steps: step3. close db
1707 * @tc.expected: step3. interface should return OK.
1708 */
1709 g_mgr.CloseKvStore(g_kvDelegatePtr);
1710 g_kvDelegatePtr = nullptr;
1711 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1712 }
1713
1714 /**
1715 * @tc.name: DeviceOfflineSyncTask003
1716 * @tc.desc: Test sync task when device offline after call sync
1717 * @tc.type: FUNC
1718 * @tc.require:
1719 * @tc.author: zhuwentao
1720 */
1721 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask003, TestSize.Level3)
1722 {
1723 std::vector<std::string> devices;
1724 devices.push_back(g_deviceB->GetDeviceId());
1725
1726 /**
1727 * @tc.steps: step1. deviceA put {k1, v1}
1728 */
1729 Key key = {'1'};
1730 Value value = {'1'};
1731 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1732 /**
1733 * @tc.steps: step2. device offline after call sync
1734 * @tc.expected: step2. interface should return OK.
1735 */
1736 Query query = Query::Select().PrefixKey(key);
1737 ASSERT_TRUE(g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, query, false) == OK);
1738 std::this_thread::sleep_for(std::chrono::milliseconds(15)); // wait for 15ms
1739 g_deviceB->Offline();
1740 }
1741
1742 /**
1743 * @tc.name: GetSyncDataFail001
1744 * @tc.desc: test get sync data failed when sync
1745 * @tc.type: FUNC
1746 * @tc.require:
1747 * @tc.author: zhuwentao
1748 */
1749 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail001, TestSize.Level1)
1750 {
1751 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1752 /**
1753 * @tc.steps: step1. device B set get data errCode control and put some data
1754 * * @tc.expected: step1. interface return ok
1755 */
1756 g_deviceB->SetGetDataErrCode(1, -E_BUSY, true);
1757 Key key1 = {'1'};
1758 Value value = {'1'};
1759 EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1760 /**
1761 * @tc.steps: step2. device B sync to device A and check data
1762 * * @tc.expected: step2. interface return ok
1763 */
1764 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1765 Value actualValue;
1766 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1767 g_deviceB->ResetDataControl();
1768 }
1769
1770 /**
1771 * @tc.name: GetSyncDataFail002
1772 * @tc.desc: test get sync data failed when sync with large data
1773 * @tc.type: FUNC
1774 * @tc.require: AR000D487B
1775 * @tc.author: zhuwentao
1776 */
1777 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail002, TestSize.Level1)
1778 {
1779 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1780 /**
1781 * @tc.steps: step1. device B set get data errCode control and put some data
1782 * * @tc.expected: step1. interface return ok
1783 */
1784 g_deviceB->SetGetDataErrCode(2, -E_BUSY, true);
1785 int totalSize = 4000u;
1786 std::vector<Entry> entries;
1787 std::vector<Key> keys;
1788 const int keyLen = 10; // 20 Bytes
1789 const int valueLen = 10; // 20 Bytes
1790 DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1791 uint32_t i = 1u;
1792 for (const auto &entry : entries) {
1793 EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1794 i++;
1795 }
1796 /**
1797 * @tc.steps: step2. device B sync to device A and check data
1798 * * @tc.expected: step2. interface return ok
1799 */
1800 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1801 std::this_thread::sleep_for(std::chrono::seconds(1));
1802 Value actualValue;
1803 for (int j = 1u; j <= totalSize; j++) {
1804 if (j > totalSize / 2) {
1805 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1806 } else {
1807 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1808 }
1809 }
1810 g_deviceB->ResetDataControl();
1811 }
1812
1813 /**
1814 * @tc.name: GetSyncDataFail003
1815 * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1816 * @tc.type: FUNC
1817 * @tc.require:
1818 * @tc.author: zhuwentao
1819 */
1820 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail003, TestSize.Level1)
1821 {
1822 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1823 /**
1824 * @tc.steps: step1. device B set get data errCode control and put some data
1825 * * @tc.expected: step1. interface return ok
1826 */
1827 g_deviceB->SetGetDataErrCode(1, -E_EKEYREVOKED, true);
1828 Key key1 = {'1'};
1829 Key key2 = {'3'};
1830 Value value = {'1'};
1831 EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1832 EXPECT_EQ(g_kvDelegatePtr->Put(key2, value), OK);
1833 /**
1834 * @tc.steps: step2. device B sync to device A and check data
1835 * * @tc.expected: step2. interface return ok
1836 */
1837 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1838 Value actualValue;
1839 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1840 VirtualDataItem item;
1841 EXPECT_EQ(g_deviceB->GetData(key2, item), E_OK);
1842 g_deviceB->ResetDataControl();
1843 }
1844
1845 /**
1846 * @tc.name: GetSyncDataFail004
1847 * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1848 * @tc.type: FUNC
1849 * @tc.require:
1850 * @tc.author: zhuwentao
1851 */
1852 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail004, TestSize.Level1)
1853 {
1854 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1855 /**
1856 * @tc.steps: step1. device B set get data errCode control and put some data
1857 * * @tc.expected: step1. interface return ok
1858 */
1859 g_deviceB->SetGetDataErrCode(2, -E_EKEYREVOKED, true);
1860 int totalSize = 4000u;
1861 std::vector<Entry> entries;
1862 std::vector<Key> keys;
1863 const int keyLen = 10; // 20 Bytes
1864 const int valueLen = 10; // 20 Bytes
1865 DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1866 uint32_t i = 1u;
1867 for (const auto &entry : entries) {
1868 EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1869 i++;
1870 }
1871 Key key = {'a', 'b', 'c'};
1872 Value value = {'1'};
1873 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1874 /**
1875 * @tc.steps: step2. device B sync to device A and check data
1876 * * @tc.expected: step2. interface return ok
1877 */
1878 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1879 std::this_thread::sleep_for(std::chrono::seconds(1));
1880 Value actualValue;
1881 for (int j = 1u; j <= totalSize; j++) {
1882 if (j > totalSize / 2) {
1883 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1884 } else {
1885 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1886 }
1887 }
1888 VirtualDataItem item;
1889 EXPECT_EQ(g_deviceB->GetData(key, item), E_OK);
1890 g_deviceB->ResetDataControl();
1891 }
1892
1893 /**
1894 * @tc.name: InterceptDataFail001
1895 * @tc.desc: test intercept data failed when sync
1896 * @tc.type: FUNC
1897 * @tc.require:
1898 * @tc.author: zhuwentao
1899 */
1900 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptDataFail001, TestSize.Level1)
1901 {
1902 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1903 /**
1904 * @tc.steps: step1. device A set intercept data errCode and put some data
1905 * * @tc.expected: step1. interface return ok
1906 */
1907 g_kvDelegatePtr->SetPushDataInterceptor(
__anon61e499a11702(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 1908 [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
1909 int errCode = OK;
1910 auto entries = data.GetEntries();
1911 LOGD("====here111,size=%d", entries.size());
1912 for (size_t i = 0; i < entries.size(); i++) {
1913 Key newKey;
1914 errCode = data.ModifyKey(i, newKey);
1915 if (errCode != OK) {
1916 break;
1917 }
1918 }
1919 return errCode;
1920 }
1921 );
1922 Key key = {'1'};
1923 Value value = {'1'};
1924 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1925 /**
1926 * @tc.steps: step2. device A sync to device B and check data
1927 * * @tc.expected: step2. interface return ok
1928 */
1929 std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
1930 std::map<std::string, DBStatus> result;
1931 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1932 ASSERT_TRUE(result.size() == devices.size());
1933 for (const auto &pair : result) {
1934 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1935 EXPECT_TRUE(pair.second == INTERCEPT_DATA_FAIL);
1936 }
1937 VirtualDataItem item;
1938 EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
1939 }
1940
1941 /**
1942 * @tc.name: InterceptDataFail002
1943 * @tc.desc: test intercept data failed when sync
1944 * @tc.type: FUNC
1945 * @tc.require:
1946 * @tc.author: zhangqiquan
1947 */
1948 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptDataFail002, TestSize.Level0)
1949 {
1950 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1951 /**
1952 * @tc.steps: step1. device A set intercept data errCode and B put some data
1953 * @tc.expected: step1. interface return ok
1954 */
1955 g_kvDelegatePtr->SetReceiveDataInterceptor(
__anon61e499a11802(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 1956 [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
1957 auto entries = data.GetEntries();
1958 LOGD("====on receive,size=%d", entries.size());
1959 for (size_t i = 0; i < entries.size(); i++) {
1960 Key newKey;
1961 int errCode = data.ModifyKey(i, newKey);
1962 if (errCode != OK) {
1963 return errCode;
1964 }
1965 }
1966 return E_OK;
1967 }
1968 );
1969 Key key = {'1'};
1970 Value value = {'1'};
1971 g_deviceB->PutData(key, value, 1u, 0); // 1 is timestamp
1972 /**
1973 * @tc.steps: step2. device A sync to device B and check data
1974 * @tc.expected: step2. interface return ok
1975 */
1976 std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
1977 std::map<std::string, DBStatus> result;
1978 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1979 ASSERT_TRUE(result.size() == devices.size());
1980 for (const auto &pair : result) {
1981 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1982 EXPECT_EQ(pair.second, INTERCEPT_DATA_FAIL);
1983 }
1984 Value actualValue;
1985 EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), NOT_FOUND);
1986 }
1987
1988 /**
1989 * @tc.name: InterceptData001
1990 * @tc.desc: test intercept receive data when sync
1991 * @tc.type: FUNC
1992 * @tc.require:
1993 * @tc.author: zhangqiquan
1994 */
1995 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptData001, TestSize.Level0)
1996 {
1997 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1998 /**
1999 * @tc.steps: step1. device A set intercept data errCode and B put some data
2000 * @tc.expected: step1. interface return ok
2001 */
2002 g_kvDelegatePtr->SetReceiveDataInterceptor(
__anon61e499a11902(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 2003 [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
2004 auto entries = data.GetEntries();
2005 LOGD("====on receive,size=%d", entries.size());
2006 for (size_t i = 0; i < entries.size(); i++) {
2007 Key newKey = {'2'};
2008 int errCode = data.ModifyKey(i, newKey);
2009 if (errCode != OK) {
2010 return errCode;
2011 }
2012 Value newValue = {'3'};
2013 errCode = data.ModifyValue(i, newValue);
2014 if (errCode != OK) {
2015 return errCode;
2016 }
2017 }
2018 return E_OK;
2019 }
2020 );
2021 Key key = {'1'};
2022 Value value = {'1'};
2023 g_deviceB->PutData(key, value, 1u, 0); // 1 is timestamp
2024 /**
2025 * @tc.steps: step2. device A sync to device B and check data
2026 * @tc.expected: step2. interface return ok
2027 */
2028 std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2029 std::map<std::string, DBStatus> result;
2030 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
2031 ASSERT_TRUE(result.size() == devices.size());
2032 for (const auto &pair : result) {
2033 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2034 EXPECT_EQ(pair.second, OK);
2035 }
2036 Value actualValue;
2037 EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), NOT_FOUND);
2038 key = {'2'};
2039 EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), OK);
2040 value = {'3'};
2041 EXPECT_EQ(actualValue, value);
2042 }
2043
2044 /**
2045 * @tc.name: UpdateKey001
2046 * @tc.desc: test update key can effect local data and sync data, without delete data
2047 * @tc.type: FUNC
2048 * @tc.require:
2049 * @tc.author: zhangqiquan
2050 */
2051 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, UpdateKey001, TestSize.Level1)
2052 {
2053 /**
2054 * @tc.steps: step1. device A set sync data (k1, v1) local data (k2, v2) (k3, v3) and delete (k4, v4)
2055 * @tc.expected: step1. put data return ok
2056 */
2057 Key k1 = {'k', '1'};
2058 Value v1 = {'v', '1'};
2059 g_deviceB->PutData(k1, v1, 1, 0);
2060 ASSERT_EQ(g_deviceB->Sync(SyncMode::SYNC_MODE_PUSH_ONLY, true), E_OK);
2061 Value actualValue;
2062 EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
2063 EXPECT_EQ(v1, actualValue);
2064 Key k2 = {'k', '2'};
2065 Value v2 = {'v', '2'};
2066 Key k3 = {'k', '3'};
2067 Value v3 = {'v', '3'};
2068 Key k4 = {'k', '4'};
2069 Value v4 = {'v', '4'};
2070 EXPECT_EQ(g_kvDelegatePtr->Put(k2, v2), OK);
2071 EXPECT_EQ(g_kvDelegatePtr->Put(k3, v3), OK);
2072 EXPECT_EQ(g_kvDelegatePtr->Put(k4, v4), OK);
2073 EXPECT_EQ(g_kvDelegatePtr->Delete(k4), OK);
2074 /**
2075 * @tc.steps: step2. device A update key and set
2076 * @tc.expected: step2. put data return ok
2077 */
__anon61e499a11a02(const Key &originKey, Key &newKey) 2078 DBStatus status = g_kvDelegatePtr->UpdateKey([](const Key &originKey, Key &newKey) {
2079 newKey = originKey;
2080 newKey.push_back('0');
2081 });
2082 EXPECT_EQ(status, OK);
2083 k1.push_back('0');
2084 k2.push_back('0');
2085 k3.push_back('0');
2086 EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
2087 EXPECT_EQ(v1, actualValue);
2088 EXPECT_EQ(g_kvDelegatePtr->Get(k2, actualValue), OK);
2089 EXPECT_EQ(v2, actualValue);
2090 EXPECT_EQ(g_kvDelegatePtr->Get(k3, actualValue), OK);
2091 EXPECT_EQ(v3, actualValue);
2092 }
2093
2094 /**
2095 * @tc.name: MetaBusy001
2096 * @tc.desc: test sync normal when update water mark busy
2097 * @tc.type: FUNC
2098 * @tc.require:
2099 * @tc.author: zhangqiquan
2100 */
2101 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, MetaBusy001, TestSize.Level1)
2102 {
2103 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2104 Key key = {'1'};
2105 Value value = {'1'};
2106 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
2107 std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2108 std::map<std::string, DBStatus> result;
2109 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2110 ASSERT_EQ(result.size(), devices.size());
2111 for (const auto &pair : result) {
2112 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2113 EXPECT_TRUE(pair.second == OK);
2114 }
2115 value = {'2'};
2116 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
__anon61e499a11b02() 2117 g_deviceB->SetSaveDataCallback([] () {
2118 RuntimeContext::GetInstance()->ScheduleTask([]() {
2119 g_deviceB->EraseWaterMark("real_device");
2120 });
2121 std::this_thread::sleep_for(std::chrono::seconds(1));
2122 });
2123 EXPECT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2124 EXPECT_EQ(result.size(), devices.size());
2125 for (const auto &pair : result) {
2126 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2127 EXPECT_TRUE(pair.second == OK);
2128 }
2129 g_deviceB->SetSaveDataCallback(nullptr);
2130 RuntimeContext::GetInstance()->StopTaskPool();
2131 }