1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <condition_variable>
17 #include <gtest/gtest.h>
18 #include <thread>
19
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "mock_sync_task_context.h"
27 #include "platform_specific.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_kv_sync_task_context.h"
30
31 using namespace testing::ext;
32 using namespace DistributedDB;
33 using namespace DistributedDBUnitTest;
34 using namespace std;
35
36 namespace {
37 class TestSingleVerKvSyncTaskContext : public SingleVerKvSyncTaskContext {
38 public:
39 TestSingleVerKvSyncTaskContext() = default;
40 };
41 string g_testDir;
42 const string STORE_ID = "kv_stroe_complex_sync_test";
43 const int WAIT_TIME = 1000;
44 const std::string DEVICE_A = "real_device";
45 const std::string DEVICE_B = "deviceB";
46 const std::string DEVICE_C = "deviceC";
47 const std::string CREATE_SYNC_TABLE_SQL =
48 "CREATE TABLE IF NOT EXISTS sync_data(" \
49 "key BLOB NOT NULL," \
50 "value BLOB," \
51 "timestamp INT NOT NULL," \
52 "flag INT NOT NULL," \
53 "device BLOB," \
54 "ori_device BLOB," \
55 "hash_key BLOB PRIMARY KEY NOT NULL," \
56 "w_timestamp INT);";
57
58 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
59 KvStoreConfig g_config;
60 DistributedDBToolsUnitTest g_tool;
61 DBStatus g_kvDelegateStatus = INVALID_ARGS;
62 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
63 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
64 KvVirtualDevice *g_deviceB = nullptr;
65 KvVirtualDevice *g_deviceC = nullptr;
66
67 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
68 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
69 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
70
PullSyncTest()71 void PullSyncTest()
72 {
73 DBStatus status = OK;
74 std::vector<std::string> devices;
75 devices.push_back(g_deviceB->GetDeviceId());
76
77 Key key = {'1'};
78 Key key2 = {'2'};
79 Value value = {'1'};
80 g_deviceB->PutData(key, value, 0, 0);
81 g_deviceB->PutData(key2, value, 1, 0);
82
83 std::map<std::string, DBStatus> result;
84 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
85 ASSERT_TRUE(status == OK);
86
87 ASSERT_TRUE(result.size() == devices.size());
88 for (const auto &pair : result) {
89 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
90 EXPECT_TRUE(pair.second == OK);
91 }
92 Value value3;
93 EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
94 EXPECT_EQ(value3, value);
95 EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
96 EXPECT_EQ(value3, value);
97 }
98
CrudTest()99 void CrudTest()
100 {
101 vector<Entry> entries;
102 int totalSize = 10;
103 for (int i = 0; i < totalSize; i++) {
104 Entry entry;
105 entry.key.push_back(i);
106 entry.value.push_back('2');
107 entries.push_back(entry);
108 }
109 EXPECT_TRUE(g_kvDelegatePtr->PutBatch(entries) == OK);
110 for (const auto &entry : entries) {
111 Value resultvalue;
112 EXPECT_TRUE(g_kvDelegatePtr->Get(entry.key, resultvalue) == OK);
113 EXPECT_TRUE(resultvalue == entry.value);
114 }
115 for (int i = 0; i < totalSize / 2; i++) { // 2: Half of the total
116 g_kvDelegatePtr->Delete(entries[i].key);
117 Value resultvalue;
118 EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == NOT_FOUND);
119 }
120 for (int i = totalSize / 2; i < totalSize; i++) {
121 Value value = entries[i].value;
122 value.push_back('x');
123 EXPECT_TRUE(g_kvDelegatePtr->Put(entries[i].key, value) == OK);
124 Value resultvalue;
125 EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == OK);
126 EXPECT_TRUE(resultvalue == value);
127 }
128 }
129
DataSync005()130 void DataSync005()
131 {
132 ASSERT_NE(g_communicatorAggregator, nullptr);
133 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
134 ASSERT_TRUE(dataSync != nullptr);
135 dataSync->SendSaveDataNotifyPacket(nullptr, 0, 0, 0, TIME_SYNC_MESSAGE);
136 EXPECT_EQ(g_communicatorAggregator->GetOnlineDevices().size(), 3u); // 3 online dev
137 delete dataSync;
138 }
139
DataSync008()140 void DataSync008()
141 {
142 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
143 ASSERT_TRUE(dataSync != nullptr);
144 auto context = new (std::nothrow) MockSyncTaskContext();
145 dataSync->PutDataMsg(nullptr);
146 bool isNeedHandle = false;
147 bool isContinue = false;
148 EXPECT_EQ(dataSync->MoveNextDataMsg(context, isNeedHandle, isContinue), nullptr);
149 EXPECT_EQ(isNeedHandle, false);
150 EXPECT_EQ(isContinue, false);
151 delete dataSync;
152 delete context;
153 }
154
ReSetWaterDogTest001()155 void ReSetWaterDogTest001()
156 {
157 /**
158 * @tc.steps: step1. put 10 key/value
159 * @tc.expected: step1, put return OK.
160 */
161 for (int i = 0; i < 5; i++) { // put 5 key
162 Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 1024); // rand num 1024 for test
163 Value value;
164 DistributedDBToolsUnitTest::GetRandomKeyValue(value, 10 * 50 * 1024u); // 10 * 50 * 1024 = 500k
165 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
166 }
167 /**
168 * @tc.steps: step2. SetDeviceMtuSize
169 * @tc.expected: step2, return OK.
170 */
171 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 50 * 1024u); // 50 * 1024u = 50k
172 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 50 * 1024u); // 50 * 1024u = 50k
173 /**
174 * @tc.steps: step3. deviceA,deviceB sync to each other at same time
175 * @tc.expected: step3. sync should return OK.
176 */
177 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true), E_OK);
178 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
179 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
180 }
181 }
182
183 class DistributedDBSingleVerP2PComplexSyncTest : public testing::Test {
184 public:
185 static void SetUpTestCase(void);
186 static void TearDownTestCase(void);
187 void SetUp();
188 void TearDown();
189 };
190
SetUpTestCase(void)191 void DistributedDBSingleVerP2PComplexSyncTest::SetUpTestCase(void)
192 {
193 /**
194 * @tc.setup: Init datadir and Virtual Communicator.
195 */
196 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
197 g_config.dataDir = g_testDir;
198 g_mgr.SetKvStoreConfig(g_config);
199
200 string dir = g_testDir + "/single_ver";
201 DIR* dirTmp = opendir(dir.c_str());
202 if (dirTmp == nullptr) {
203 OS::MakeDBDirectory(dir);
204 } else {
205 closedir(dirTmp);
206 }
207
208 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
209 ASSERT_TRUE(g_communicatorAggregator != nullptr);
210 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
211 }
212
TearDownTestCase(void)213 void DistributedDBSingleVerP2PComplexSyncTest::TearDownTestCase(void)
214 {
215 /**
216 * @tc.teardown: Release virtual Communicator and clear data dir.
217 */
218 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
219 LOGE("rm test db files error!");
220 }
221 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
222 }
223
SetUp(void)224 void DistributedDBSingleVerP2PComplexSyncTest::SetUp(void)
225 {
226 DistributedDBToolsUnitTest::PrintTestCaseInfo();
227 /**
228 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
229 */
230 KvStoreNbDelegate::Option option;
231 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
232 ASSERT_TRUE(g_kvDelegateStatus == OK);
233 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
234 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
235 ASSERT_TRUE(g_deviceB != nullptr);
236 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
237 ASSERT_TRUE(syncInterfaceB != nullptr);
238 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
239
240 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
241 ASSERT_TRUE(g_deviceC != nullptr);
242 VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
243 ASSERT_TRUE(syncInterfaceC != nullptr);
244 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
245
246 auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
247 const std::string &deviceId, uint8_t flag) -> bool {
248 return true;
249 };
250 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
251 }
252
TearDown(void)253 void DistributedDBSingleVerP2PComplexSyncTest::TearDown(void)
254 {
255 /**
256 * @tc.teardown: Release device A, B, C
257 */
258 if (g_kvDelegatePtr != nullptr) {
259 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
260 g_kvDelegatePtr = nullptr;
261 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
262 LOGD("delete kv store status %d", status);
263 ASSERT_TRUE(status == OK);
264 }
265 if (g_deviceB != nullptr) {
266 delete g_deviceB;
267 g_deviceB = nullptr;
268 }
269 if (g_deviceC != nullptr) {
270 delete g_deviceC;
271 g_deviceC = nullptr;
272 }
273 PermissionCheckCallbackV2 nullCallback;
274 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
275 }
276
277 /**
278 * @tc.name: SaveDataNotify001
279 * @tc.desc: Test SaveDataNotify function, delay < 30s should sync ok, > 36 should timeout
280 * @tc.type: FUNC
281 * @tc.require: AR000D4876
282 * @tc.author: xushaohua
283 */
284 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SaveDataNotify001, TestSize.Level3)
285 {
286 DBStatus status = OK;
287 const int waitFiveSeconds = 5000;
288 const int waitThirtySeconds = 30000;
289 const int waitThirtySixSeconds = 36000;
290 std::vector<std::string> devices;
291 devices.push_back(g_deviceB->GetDeviceId());
292
293 /**
294 * @tc.steps: step1. deviceA put {k1, v1}
295 */
296 Key key = {'1'};
297 Value value = {'1'};
298 status = g_kvDelegatePtr->Put(key, value);
299 ASSERT_TRUE(status == OK);
300
301 /**
302 * @tc.steps: step2. deviceB set sava data dely 5s
303 */
304 g_deviceB->SetSaveDataDelayTime(waitFiveSeconds);
305
306 /**
307 * @tc.steps: step3. deviceA call sync and wait
308 * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
309 */
310 std::map<std::string, DBStatus> result;
311 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
312 ASSERT_TRUE(status == OK);
313 ASSERT_TRUE(result.size() == devices.size());
314 ASSERT_TRUE(result[DEVICE_B] == OK);
315
316 /**
317 * @tc.steps: step4. deviceB set sava data dely 30s and put {k1, v1}
318 */
319 g_deviceB->SetSaveDataDelayTime(waitThirtySeconds);
320 status = g_kvDelegatePtr->Put(key, value);
321 ASSERT_TRUE(status == OK);
322 /**
323 * @tc.steps: step3. deviceA call sync and wait
324 * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
325 */
326 result.clear();
327 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
328 ASSERT_TRUE(status == OK);
329 ASSERT_TRUE(result.size() == devices.size());
330 ASSERT_TRUE(result[DEVICE_B] == OK);
331
332 /**
333 * @tc.steps: step4. deviceB set sava data dely 36s and put {k1, v1}
334 */
335 g_deviceB->SetSaveDataDelayTime(waitThirtySixSeconds);
336 status = g_kvDelegatePtr->Put(key, value);
337 ASSERT_TRUE(status == OK);
338 /**
339 * @tc.steps: step5. deviceA call sync and wait
340 * @tc.expected: step5. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
341 */
342 result.clear();
343 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
344 ASSERT_TRUE(status == OK);
345 ASSERT_TRUE(result.size() == devices.size());
346 ASSERT_TRUE(result[DEVICE_B] == TIME_OUT);
347 }
348
349 /**
350 * @tc.name: SametimeSync001
351 * @tc.desc: Test 2 device sync with each other
352 * @tc.type: FUNC
353 * @tc.require: AR000CCPOM
354 * @tc.author: zhangqiquan
355 */
356 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync001, TestSize.Level3)
357 {
358 DBStatus status = OK;
359 std::vector<std::string> devices;
360 devices.push_back(g_deviceB->GetDeviceId());
361
362 int responseCount = 0;
363 int requestCount = 0;
364 Key key = {'1'};
365 Value value = {'1'};
366 /**
367 * @tc.steps: step1. make sure deviceB send pull firstly and response_pull secondly
368 * @tc.expected: step1. deviceA put data when finish push task. put data should return OK.
369 */
370 g_communicatorAggregator->RegOnDispatch([&responseCount, &requestCount, &key, &value](
__anon7f2dc5a00302( const std::string &target, DistributedDB::Message *msg) 371 const std::string &target, DistributedDB::Message *msg) {
372 if (target != "real_device" || msg->GetMessageId() != DATA_SYNC_MESSAGE) {
373 return;
374 }
375
376 if (msg->GetMessageType() == TYPE_RESPONSE) {
377 responseCount++;
378 if (responseCount == 1) { // 1 is the ack which B response A's push task
379 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), DBStatus::OK);
380 std::this_thread::sleep_for(std::chrono::seconds(1));
381 } else if (responseCount == 2) { // 2 is the ack which B response A's response_pull task
382 msg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
383 }
384 } else if (msg->GetMessageType() == TYPE_REQUEST) {
385 requestCount++;
386 if (requestCount == 1) { // 1 is A push task
387 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2 sec
388 }
389 }
390 });
391 /**
392 * @tc.steps: step2. deviceA,deviceB sync to each other at same time
393 * @tc.expected: step2. sync should return OK.
394 */
395 std::map<std::string, DBStatus> result;
__anon7f2dc5a00402null396 std::thread subThread([]{
397 g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true);
398 });
399 status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_PULL, result);
400 subThread.join();
401 g_communicatorAggregator->RegOnDispatch(nullptr);
402
403 EXPECT_TRUE(status == OK);
404 ASSERT_TRUE(result.size() == devices.size());
405 EXPECT_TRUE(result[DEVICE_B] == OK);
406 Value actualValue;
407 g_kvDelegatePtr->Get(key, actualValue);
408 EXPECT_EQ(actualValue, value);
409 }
410
411 /**
412 * @tc.name: SametimeSync002
413 * @tc.desc: Test 2 device sync with each other with water error
414 * @tc.type: FUNC
415 * @tc.require: AR000CCPOM
416 * @tc.author: zhangqiquan
417 */
418 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync002, TestSize.Level3)
419 {
420 DBStatus status = OK;
421 std::vector<std::string> devices;
422 devices.push_back(g_deviceB->GetDeviceId());
423 g_kvDelegatePtr->Put({'k', '1'}, {'v', '1'});
424 /**
425 * @tc.steps: step1. make sure deviceA push data failed and increase water mark
426 * @tc.expected: step1. deviceA push failed with timeout
427 */
__anon7f2dc5a00502(const std::string &target, DistributedDB::Message *msg) 428 g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
429 ASSERT_NE(msg, nullptr);
430 if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
431 msg->SetMessageId(INVALID_MESSAGE_ID);
432 }
433 });
434 std::map<std::string, DBStatus> result;
__anon7f2dc5a00602(const std::map<std::string, DBStatus> &map) 435 auto callback = [&result](const std::map<std::string, DBStatus> &map) {
436 result = map;
437 };
438 Query query = Query::Select().PrefixKey({'k', '1'});
439 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
440 ASSERT_TRUE(result.size() == devices.size());
441 EXPECT_TRUE(result[DEVICE_B] == TIME_OUT);
442 /**
443 * @tc.steps: step2. A push to B with query2, sleep 1s for waiting step3
444 * @tc.expected: step2. sync should return OK.
445 */
__anon7f2dc5a00702(const std::string &target, DistributedDB::Message *msg) 446 g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
447 ASSERT_NE(msg, nullptr);
448 if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
449 std::this_thread::sleep_for(std::chrono::seconds(1));
450 }
451 });
__anon7f2dc5a00802null452 std::thread subThread([&devices] {
453 std::map<std::string, DBStatus> result;
454 auto callback = [&result](const std::map<std::string, DBStatus> &map) {
455 result = map;
456 };
457 Query query = Query::Select().PrefixKey({'k', '2'});
458 LOGD("Begin PUSH");
459 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
460 ASSERT_TRUE(result.size() == devices.size());
461 EXPECT_TRUE(result[DEVICE_A] == OK);
462 });
463 /**
464 * @tc.steps: step3. B pull to A when A is in push task
465 * @tc.expected: step3. sync should return OP_FINISHED_ALL.
466 */
467 std::this_thread::sleep_for(std::chrono::milliseconds(100));
468 std::map<std::string, int> virtualResult;
469 g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, query,
__anon7f2dc5a00a02(const std::map<std::string, int> &map) 470 [&virtualResult](const std::map<std::string, int> &map) {
471 virtualResult = map;
472 }, true);
473 EXPECT_TRUE(status == OK);
474 ASSERT_EQ(virtualResult.size(), devices.size());
475 EXPECT_EQ(virtualResult[DEVICE_A], SyncOperation::OP_FINISHED_ALL);
476 g_communicatorAggregator->RegOnDispatch(nullptr);
477 subThread.join();
478 }
479
480 /**
481 * @tc.name: DatabaseOnlineCallback001
482 * @tc.desc: check database status notify online callback
483 * @tc.type: FUNC
484 * @tc.require: AR000CQS3S SR000CQE0B
485 * @tc.author: zhuwentao
486 */
487 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOnlineCallback001, TestSize.Level1)
488 {
489 /**
490 * @tc.steps: step1. SetStoreStatusNotifier
491 * @tc.expected: step1. SetStoreStatusNotifier ok
492 */
493 std::string targetDev = "DEVICE_X";
494 bool isCheckOk = false;
495 auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anon7f2dc5a00b02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 496 const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
497 if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
498 onlineStatus == true) {
499 isCheckOk = true;
500 }};
501 g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
502 /**
503 * @tc.steps: step2. trigger device online
504 * @tc.expected: step2. check callback ok
505 */
506 g_communicatorAggregator->OnlineDevice(targetDev);
507 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
508 EXPECT_EQ(isCheckOk, true);
509 StoreStatusNotifier nullCallback;
510 g_mgr.SetStoreStatusNotifier(nullCallback);
511 }
512
513 /**
514 * @tc.name: DatabaseOfflineCallback001
515 * @tc.desc: check database status notify online callback
516 * @tc.type: FUNC
517 * @tc.require: AR000CQS3S SR000CQE0B
518 * @tc.author: zhuwentao
519 */
520 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOfflineCallback001, TestSize.Level1)
521 {
522 /**
523 * @tc.steps: step1. SetStoreStatusNotifier
524 * @tc.expected: step1. SetStoreStatusNotifier ok
525 */
526 std::string targetDev = "DEVICE_X";
527 bool isCheckOk = false;
528 auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anon7f2dc5a00c02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 529 const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
530 if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
531 onlineStatus == false) {
532 isCheckOk = true;
533 }};
534 g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
535 /**
536 * @tc.steps: step2. trigger device offline
537 * @tc.expected: step2. check callback ok
538 */
539 g_communicatorAggregator->OfflineDevice(targetDev);
540 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
541 EXPECT_EQ(isCheckOk, true);
542 StoreStatusNotifier nullCallback;
543 g_mgr.SetStoreStatusNotifier(nullCallback);
544 }
545
546 /**
547 * @tc.name: CloseSync001
548 * @tc.desc: Test 2 delegate close when sync
549 * @tc.type: FUNC
550 * @tc.require: AR000CCPOM
551 * @tc.author: zhangqiquan
552 */
553 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync001, TestSize.Level3)
554 {
555 DBStatus status = OK;
556 std::vector<std::string> devices;
557 devices.push_back(g_deviceB->GetDeviceId());
558
559 /**
560 * @tc.steps: step1. make sure A sync start
561 */
562 bool sleep = false;
__anon7f2dc5a00d02(const std::string &target, DistributedDB::Message *msg) 563 g_communicatorAggregator->RegOnDispatch([&sleep](const std::string &target, DistributedDB::Message *msg) {
564 if (!sleep) {
565 sleep = true;
566 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s for waiting close db
567 }
568 });
569
570 KvStoreNbDelegate* kvDelegatePtrA = nullptr;
571 KvStoreNbDelegate::Option option;
__anon7f2dc5a00e02(DBStatus s, KvStoreNbDelegate *delegate) 572 g_mgr.GetKvStore(STORE_ID, option, [&status, &kvDelegatePtrA](DBStatus s, KvStoreNbDelegate *delegate) {
573 status = s;
574 kvDelegatePtrA = delegate;
575 });
576 EXPECT_EQ(status, OK);
577 EXPECT_NE(kvDelegatePtrA, nullptr);
578
579 Key key = {'k'};
580 Value value = {'v'};
581 kvDelegatePtrA->Put(key, value);
582 std::map<std::string, DBStatus> result;
__anon7f2dc5a00f02(const std::map<std::string, DBStatus>& statusMap) 583 auto callback = [&result](const std::map<std::string, DBStatus>& statusMap) {
584 result = statusMap;
585 };
586 /**
587 * @tc.steps: step2. deviceA sync and then close
588 * @tc.expected: step2. sync should abort and data don't exist in B
589 */
__anon7f2dc5a01002() 590 std::thread closeThread([&kvDelegatePtrA]() {
591 std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for waiting sync start
592 EXPECT_EQ(g_mgr.CloseKvStore(kvDelegatePtrA), OK);
593 });
594 EXPECT_EQ(kvDelegatePtrA->Sync(devices, SYNC_MODE_PUSH_ONLY, callback, false), OK);
595 LOGD("Sync finish");
596 closeThread.join();
597 std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s for waiting sync finish
598 EXPECT_EQ(result.size(), 0u);
599 VirtualDataItem actualValue;
600 EXPECT_EQ(g_deviceB->GetData(key, actualValue), -E_NOT_FOUND);
601 g_communicatorAggregator->RegOnDispatch(nullptr);
602 }
603
604 /**
605 * @tc.name: CloseSync002
606 * @tc.desc: Test 1 delegate close when in time sync
607 * @tc.type: FUNC
608 * @tc.require: AR000CCPOM
609 * @tc.author: zhangqiquan
610 */
611 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync002, TestSize.Level3)
612 {
613 /**
614 * @tc.steps: step1. invalid time sync packet from A
615 */
__anon7f2dc5a01102(const std::string &target, DistributedDB::Message *msg) 616 g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
617 ASSERT_NE(msg, nullptr);
618 if (target == DEVICE_B && msg->GetMessageId() == TIME_SYNC_MESSAGE && msg->GetMessageType() == TYPE_REQUEST) {
619 msg->SetMessageId(INVALID_MESSAGE_ID);
620 LOGD("Message is invalid");
621 }
622 });
623 Timestamp currentTime;
624 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
625 g_deviceB->PutData({'k'}, {'v'}, currentTime, 0);
626
627 /**
628 * @tc.steps: step2. B PUSH to A and A close after 1s
629 * @tc.expected: step2. A closing time cost letter than 4s
630 */
__anon7f2dc5a01202() 631 std::thread closingThread([]() {
632 std::this_thread::sleep_for(std::chrono::seconds(1));
633 LOGD("Begin Close");
634 Timestamp beginTime;
635 (void)OS::GetCurrentSysTimeInMicrosecond(beginTime);
636 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
637 Timestamp endTime;
638 (void)OS::GetCurrentSysTimeInMicrosecond(endTime);
639 EXPECT_LE(static_cast<int>(endTime - beginTime), 4 * 1000 * 1000); // waiting 4 * 1000 * 1000 us
640 LOGD("End Close");
641 });
642 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
643 closingThread.join();
644
645 /**
646 * @tc.steps: step3. remove db
647 * @tc.expected: step3. remove ok
648 */
649 g_kvDelegatePtr = nullptr;
650 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
651 LOGD("delete kv store status %d", status);
652 ASSERT_TRUE(status == OK);
653 g_communicatorAggregator->RegOnDispatch(nullptr);
654 }
655
656 /**
657 * @tc.name: OrderbyWriteTimeSync001
658 * @tc.desc: sync query with order by writeTime
659 * @tc.type: FUNC
660 * @tc.require: AR000H5VLO
661 * @tc.author: zhuwentao
662 */
663 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, OrderbyWriteTimeSync001, TestSize.Level0)
664 {
665 /**
666 * @tc.steps: step1. deviceA subscribe query with order by write time
667 * * @tc.expected: step1. interface return not support
668 */
669 std::vector<std::string> devices;
670 devices.push_back(g_deviceB->GetDeviceId());
671 Query query = Query::Select().PrefixKey({'k'}).OrderByWriteTime(true);
672 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, nullptr, query, true), NOT_SUPPORT);
673 }
674
675
676 /**
677 * @tc.name: Device Offline Sync 001
678 * @tc.desc: Test push sync when device offline
679 * @tc.type: FUNC
680 * @tc.require: AR000CCPOM
681 * @tc.author: xushaohua
682 */
683 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync001, TestSize.Level1)
684 {
685 std::vector<std::string> devices;
686 devices.push_back(g_deviceB->GetDeviceId());
687 devices.push_back(g_deviceC->GetDeviceId());
688
689 /**
690 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2}, {k3 delete}, {k4,v2}
691 */
692 Key key1 = {'1'};
693 Value value1 = {'1'};
694 ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value1) == OK);
695
696 Key key2 = {'2'};
697 Value value2 = {'2'};
698 ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value2) == OK);
699
700 Key key3 = {'3'};
701 Value value3 = {'3'};
702 ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value3) == OK);
703 ASSERT_TRUE(g_kvDelegatePtr->Delete(key3) == OK);
704
705 Key key4 = {'4'};
706 Value value4 = {'4'};
707 ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value4) == OK);
708
709 /**
710 * @tc.steps: step2. deviceB offline
711 */
712 g_deviceB->Offline();
713
714 /**
715 * @tc.steps: step3. deviceA call pull sync
716 * @tc.expected: step3. sync should return OK.
717 */
718 std::map<std::string, DBStatus> result;
719 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
720 ASSERT_TRUE(status == OK);
721
722 /**
723 * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
724 * deviceC has {k1, v1}, {k2, v2}, {k3 delete}, {k4,v4}
725 */
726 for (const auto &pair : result) {
727 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
728 if (pair.first == DEVICE_B) {
729 EXPECT_TRUE(pair.second == COMM_FAILURE);
730 } else {
731 EXPECT_TRUE(pair.second == OK);
732 }
733 }
734 VirtualDataItem item;
735 g_deviceC->GetData(key1, item);
736 EXPECT_TRUE(item.value == value1);
737 item.value.clear();
738 g_deviceC->GetData(key2, item);
739 EXPECT_TRUE(item.value == value2);
740 item.value.clear();
741 Key hashKey;
742 DistributedDBToolsUnitTest::CalcHash(key3, hashKey);
743 EXPECT_TRUE(g_deviceC->GetData(hashKey, item) == -E_NOT_FOUND);
744 item.value.clear();
745 g_deviceC->GetData(key4, item);
746 EXPECT_TRUE(item.value == value4);
747 }
748
749 /**
750 * @tc.name: Device Offline Sync 002
751 * @tc.desc: Test pull sync when device offline
752 * @tc.type: FUNC
753 * @tc.require: AR000CCPOM
754 * @tc.author: xushaohua
755 */
756 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync002, TestSize.Level1)
757 {
758 std::vector<std::string> devices;
759 devices.push_back(g_deviceB->GetDeviceId());
760 devices.push_back(g_deviceC->GetDeviceId());
761
762 /**
763 * @tc.steps: step1. deviceB put {k1, v1}
764 */
765 Key key1 = {'1'};
766 Value value1 = {'1'};
767 g_deviceB->PutData(key1, value1, 0, 0);
768
769 /**
770 * @tc.steps: step2. deviceB offline
771 */
772 g_deviceB->Offline();
773
774 /**
775 * @tc.steps: step3. deviceC put {k2, v2}, {k3, delete}, {k4, v4}
776 */
777 Key key2 = {'2'};
778 Value value2 = {'2'};
779 g_deviceC->PutData(key2, value2, 0, 0);
780
781 Key key3 = {'3'};
782 Value value3 = {'3'};
783 g_deviceC->PutData(key3, value3, 0, 1);
784
785 Key key4 = {'4'};
786 Value value4 = {'4'};
787 g_deviceC->PutData(key4, value4, 0, 0);
788
789 /**
790 * @tc.steps: step2. deviceA call pull sync
791 * @tc.expected: step2. sync should return OK.
792 */
793 std::map<std::string, DBStatus> result;
794 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
795 ASSERT_TRUE(status == OK);
796
797 /**
798 * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
799 * deviceA has {k2, v2}, {k3 delete}, {k4,v4}
800 */
801 for (const auto &pair : result) {
802 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
803 if (pair.first == DEVICE_B) {
804 EXPECT_TRUE(pair.second == COMM_FAILURE);
805 } else {
806 EXPECT_TRUE(pair.second == OK);
807 }
808 }
809
810 Value value5;
811 EXPECT_TRUE(g_kvDelegatePtr->Get(key1, value5) != OK);
812 g_kvDelegatePtr->Get(key2, value5);
813 EXPECT_EQ(value5, value2);
814 EXPECT_TRUE(g_kvDelegatePtr->Get(key3, value5) != OK);
815 g_kvDelegatePtr->Get(key4, value5);
816 EXPECT_EQ(value5, value4);
817 }
818
819 /**
820 * @tc.name: EncryptedAlgoUpgrade001
821 * @tc.desc: Test upgrade encrypted db can sync normally
822 * @tc.type: FUNC
823 * @tc.require: AR000HI2JS
824 * @tc.author: zhuwentao
825 */
826 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, EncryptedAlgoUpgrade001, TestSize.Level3)
827 {
828 /**
829 * @tc.steps: step1. clear db
830 * * @tc.expected: step1. interface return ok
831 */
832 if (g_kvDelegatePtr != nullptr) {
833 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
834 g_kvDelegatePtr = nullptr;
835 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
836 LOGD("delete kv store status %d", status);
837 ASSERT_TRUE(status == OK);
838 }
839
840 CipherPassword passwd;
841 std::vector<uint8_t> passwdVect = {'p', 's', 'd', '1'};
842 passwd.SetValue(passwdVect.data(), passwdVect.size());
843 /**
844 * @tc.steps: step2. open old db by sql
845 * * @tc.expected: step2. interface return ok
846 */
847 std::string identifier = DBCommon::GenerateIdentifierId(STORE_ID, APP_ID, USER_ID);
848 std::string hashDir = DBCommon::TransferHashString(identifier);
849 std::string hexHashDir = DBCommon::TransferStringToHex(hashDir);
850 std::string dbPath = g_testDir + "/" + hexHashDir + "/single_ver";
851 ASSERT_TRUE(DBCommon::CreateDirectory(g_testDir + "/" + hexHashDir) == OK);
852 ASSERT_TRUE(DBCommon::CreateDirectory(dbPath) == OK);
853 std::vector<std::string> dbDir {DBConstant::MAINDB_DIR, DBConstant::METADB_DIR, DBConstant::CACHEDB_DIR};
854 for (const auto &item : dbDir) {
855 ASSERT_TRUE(DBCommon::CreateDirectory(dbPath + "/" + item) == OK);
856 }
857 uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
858 sqlite3 *db;
859 std::string fileUrl = dbPath + "/" + DBConstant::MAINDB_DIR + "/" + DBConstant::SINGLE_VER_DATA_STORE + ".db";
860 ASSERT_TRUE(sqlite3_open_v2(fileUrl.c_str(), &db, flag, nullptr) == SQLITE_OK);
861 SQLiteUtils::SetKeyInner(db, CipherType::AES_256_GCM, passwd, DBConstant::DEFAULT_ITER_TIMES);
862 /**
863 * @tc.steps: step3. create table and close
864 * * @tc.expected: step3. interface return ok
865 */
866 ASSERT_TRUE(SQLiteUtils::ExecuteRawSQL(db, CREATE_SYNC_TABLE_SQL) == E_OK);
867 sqlite3_close_v2(db);
868 db = nullptr;
869 LOGI("create old db success");
870 /**
871 * @tc.steps: step4. get kvstore
872 * * @tc.expected: step4. interface return ok
873 */
874 KvStoreNbDelegate::Option option;
875 option.isEncryptedDb = true;
876 option.cipher = CipherType::AES_256_GCM;
877 option.passwd = passwd;
878 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
879 ASSERT_TRUE(g_kvDelegateStatus == OK);
880 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
881 /**
882 * @tc.steps: step5. sync ok
883 * * @tc.expected: step5. interface return ok
884 */
885 PullSyncTest();
886 /**
887 * @tc.steps: step5. crud ok
888 * * @tc.expected: step5. interface return ok
889 */
890 CrudTest();
891 }
892
893 /**
894 * @tc.name: RemoveDeviceData002
895 * @tc.desc: test remove device data before sync
896 * @tc.type: FUNC
897 * @tc.require:
898 * @tc.author: zhuwentao
899 */
900 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData002, TestSize.Level1)
901 {
902 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
903 /**
904 * @tc.steps: step1. sync deviceB data to A and check data
905 * * @tc.expected: step1. interface return ok
906 */
907 Key key1 = {'1'};
908 Key key2 = {'2'};
909 Value value = {'1'};
910 Timestamp currentTime;
911 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
912 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
913 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
914 EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
915 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
916 Value actualValue;
917 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
918 EXPECT_EQ(actualValue, value);
919 actualValue.clear();
920 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
921 EXPECT_EQ(actualValue, value);
922 /**
923 * @tc.steps: step2. call RemoveDeviceData
924 * * @tc.expected: step2. interface return ok
925 */
926 g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
927 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
928 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
929 /**
930 * @tc.steps: step3. sync to device A again and check data
931 * * @tc.expected: step3. sync ok
932 */
933 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
934 actualValue.clear();
935 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
936 EXPECT_EQ(actualValue, value);
937 actualValue.clear();
938 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
939 EXPECT_EQ(actualValue, value);
940 }
941
942 /**
943 * @tc.name: DataSync001
944 * @tc.desc: Test Data Sync when Initialize
945 * @tc.type: FUNC
946 * @tc.require: AR000HI2JS
947 * @tc.author: zhuwentao
948 */
949 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync001, TestSize.Level1)
950 {
951 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
952 ASSERT_TRUE(dataSync != nullptr);
953 std::shared_ptr<Metadata> inMetadata = nullptr;
954 std::string deviceId;
955 Message message;
956 VirtualSingleVerSyncDBInterface tmpInterface;
957 VirtualCommunicator tmpCommunicator(deviceId, g_communicatorAggregator);
958 EXPECT_EQ(dataSync->Initialize(nullptr, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
959 EXPECT_EQ(dataSync->Initialize(&tmpInterface, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
960 EXPECT_EQ(dataSync->Initialize(&tmpInterface, &tmpCommunicator, inMetadata, deviceId), -E_INVALID_ARGS);
961 delete dataSync;
962 }
963
964 /**
965 * @tc.name: DataSync002
966 * @tc.desc: Test active sync with invalid param in DataSync Class
967 * @tc.type: FUNC
968 * @tc.require: AR000HI2JS
969 * @tc.author: zhuwentao
970 */
971 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync002, TestSize.Level1)
972 {
973 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
974 ASSERT_TRUE(dataSync != nullptr);
975 Message message;
976 EXPECT_EQ(dataSync->TryContinueSync(nullptr, &message), -E_INVALID_ARGS);
977 EXPECT_EQ(dataSync->TryContinueSync(nullptr, nullptr), -E_INVALID_ARGS);
978 EXPECT_EQ(dataSync->PushStart(nullptr), -E_INVALID_ARGS);
979 EXPECT_EQ(dataSync->PushPullStart(nullptr), -E_INVALID_ARGS);
980 EXPECT_EQ(dataSync->PullRequestStart(nullptr), -E_INVALID_ARGS);
981 EXPECT_EQ(dataSync->PullResponseStart(nullptr), -E_INVALID_ARGS);
982 delete dataSync;
983 }
984
985 /**
986 * @tc.name: DataSync003
987 * @tc.desc: Test receive invalid request data packet in DataSync Class
988 * @tc.type: FUNC
989 * @tc.require: AR000HI2JS
990 * @tc.author: zhuwentao
991 */
992 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync003, TestSize.Level1)
993 {
994 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
995 ASSERT_TRUE(dataSync != nullptr);
996 uint64_t tmpMark = 0;
997 Message message;
998 EXPECT_EQ(dataSync->DataRequestRecv(nullptr, nullptr, tmpMark), -E_INVALID_ARGS);
999 EXPECT_EQ(dataSync->DataRequestRecv(nullptr, &message, tmpMark), -E_INVALID_ARGS);
1000 delete dataSync;
1001 }
1002
1003 /**
1004 * @tc.name: DataSync004
1005 * @tc.desc: Test receive invalid ack packet in DataSync Class
1006 * @tc.type: FUNC
1007 * @tc.require: AR000HI2JS
1008 * @tc.author: zhuwentao
1009 */
1010 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync004, TestSize.Level1)
1011 {
1012 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1013 ASSERT_TRUE(dataSync != nullptr);
1014 Message message;
1015 TestSingleVerKvSyncTaskContext tmpContext;
1016 EXPECT_EQ(dataSync->AckPacketIdCheck(nullptr), false);
1017 EXPECT_EQ(dataSync->AckPacketIdCheck(&message), false);
1018 EXPECT_EQ(dataSync->AckRecv(&tmpContext, nullptr), -E_INVALID_ARGS);
1019 EXPECT_EQ(dataSync->AckRecv(nullptr, nullptr), -E_INVALID_ARGS);
1020 EXPECT_EQ(dataSync->AckRecv(nullptr, &message), -E_INVALID_ARGS);
1021 delete dataSync;
1022 }
1023
1024 /**
1025 * @tc.name: DataSync005
1026 * @tc.desc: Test receive invalid notify packet in DataSync Class
1027 * @tc.type: FUNC
1028 * @tc.require: AR000HI2JS
1029 * @tc.author: zhuwentao
1030 */
1031 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync005, TestSize.Level1)
1032 {
1033 ASSERT_NO_FATAL_FAILURE(DataSync005());
1034 }
1035
1036 /**
1037 * @tc.name: DataSync006
1038 * @tc.desc: Test control start with invalid param in DataSync Class
1039 * @tc.type: FUNC
1040 * @tc.require: AR000HI2JS
1041 * @tc.author: zhuwentao
1042 */
1043 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync006, TestSize.Level1)
1044 {
1045 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1046 ASSERT_TRUE(dataSync != nullptr);
1047 TestSingleVerKvSyncTaskContext tmpContext;
1048 EXPECT_EQ(dataSync->ControlCmdStart(nullptr), -E_INVALID_ARGS);
1049 EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1050 std::shared_ptr<SubscribeManager> subManager = std::make_shared<SubscribeManager>();
1051 tmpContext.SetSubscribeManager(subManager);
1052 tmpContext.SetMode(SyncModeType::INVALID_MODE);
1053 EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1054 std::set<Key> Keys = {{'a'}, {'b'}};
1055 Query query = Query::Select().InKeys(Keys);
1056 QuerySyncObject innerQuery(query);
1057 tmpContext.SetQuery(innerQuery);
1058 tmpContext.SetMode(SyncModeType::SUBSCRIBE_QUERY);
1059 EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_NOT_SUPPORT);
1060 delete dataSync;
1061 subManager = nullptr;
1062 }
1063
1064 /**
1065 * @tc.name: DataSync007
1066 * @tc.desc: Test receive invalid control packet in DataSync Class
1067 * @tc.type: FUNC
1068 * @tc.require: AR000HI2JS
1069 * @tc.author: zhuwentao
1070 */
1071 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync007, TestSize.Level1)
1072 {
1073 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1074 ASSERT_TRUE(dataSync != nullptr);
1075 Message message;
1076 ControlRequestPacket packet;
1077 TestSingleVerKvSyncTaskContext tmpContext;
1078 EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1079 message.SetCopiedObject(packet);
1080 EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1081 delete dataSync;
1082 }
1083
1084 /**
1085 * @tc.name: DataSync008
1086 * @tc.desc: Test pull null msg in dataQueue in DataSync Class
1087 * @tc.type: FUNC
1088 * @tc.require: AR000HI2JS
1089 * @tc.author: zhuwentao
1090 */
1091 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync008, TestSize.Level1)
1092 {
1093 ASSERT_NO_FATAL_FAILURE(DataSync008());
1094 }
1095
1096 /**
1097 * @tc.name: SyncRetry001
1098 * @tc.desc: use sync retry sync use push
1099 * @tc.type: FUNC
1100 * @tc.require: AR000CKRTD AR000CQE0E
1101 * @tc.author: zhuwentao
1102 */
1103 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry001, TestSize.Level3)
1104 {
1105 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1106 std::vector<std::string> devices;
1107 devices.push_back(g_deviceB->GetDeviceId());
1108
1109 /**
1110 * @tc.steps: step1. set sync retry
1111 * @tc.expected: step1, Pragma return OK.
1112 */
1113 int pragmaData = 1;
1114 PragmaData input = static_cast<PragmaData>(&pragmaData);
1115 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1116
1117 /**
1118 * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1119 */
1120 ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1121
1122 /**
1123 * @tc.steps: step3. deviceA call sync and wait
1124 * @tc.expected: step3. sync should return OK.
1125 */
1126 std::map<std::string, DBStatus> result;
1127 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1128
1129 /**
1130 * @tc.expected: step4. onComplete should be called, and status is time_out
1131 */
1132 ASSERT_TRUE(result.size() == devices.size());
1133 for (const auto &pair : result) {
1134 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1135 EXPECT_TRUE(pair.second == OK);
1136 }
1137 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1138 }
1139
1140 /**
1141 * @tc.name: SyncRetry002
1142 * @tc.desc: use sync retry sync use pull
1143 * @tc.type: FUNC
1144 * @tc.require: AR000CKRTD AR000CQE0E
1145 * @tc.author: zhuwentao
1146 */
1147 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry002, TestSize.Level3)
1148 {
1149 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE, 4u);
1150 std::vector<std::string> devices;
1151 devices.push_back(g_deviceB->GetDeviceId());
1152
1153 /**
1154 * @tc.steps: step1. set sync retry
1155 * @tc.expected: step1, Pragma return OK.
1156 */
1157 int pragmaData = 1;
1158 PragmaData input = static_cast<PragmaData>(&pragmaData);
1159 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1160
1161 /**
1162 * @tc.steps: step2. deviceA call sync and wait
1163 * @tc.expected: step2. sync should return OK.
1164 */
1165 std::map<std::string, DBStatus> result;
1166 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1167
1168 /**
1169 * @tc.expected: step3. onComplete should be called, and status is time_out
1170 */
1171 ASSERT_TRUE(result.size() == devices.size());
1172 for (const auto &pair : result) {
1173 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1174 EXPECT_TRUE(pair.second == TIME_OUT);
1175 }
1176 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1177 }
1178
1179 /**
1180 * @tc.name: SyncRetry003
1181 * @tc.desc: use sync retry sync use push by compress
1182 * @tc.type: FUNC
1183 * @tc.require: AR000CKRTD AR000CQE0E
1184 * @tc.author: zhuwentao
1185 */
1186 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry003, TestSize.Level3)
1187 {
1188 if (g_kvDelegatePtr != nullptr) {
1189 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1190 g_kvDelegatePtr = nullptr;
1191 }
1192 /**
1193 * @tc.steps: step1. open db use Compress
1194 * @tc.expected: step1, Pragma return OK.
1195 */
1196 KvStoreNbDelegate::Option option;
1197 option.isNeedCompressOnSync = true;
1198 option.compressionRate = 70;
1199 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1200 ASSERT_TRUE(g_kvDelegateStatus == OK);
1201 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1202
1203 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1204 std::vector<std::string> devices;
1205 devices.push_back(g_deviceB->GetDeviceId());
1206
1207 /**
1208 * @tc.steps: step2. set sync retry
1209 * @tc.expected: step2, Pragma return OK.
1210 */
1211 int pragmaData = 1;
1212 PragmaData input = static_cast<PragmaData>(&pragmaData);
1213 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1214
1215 /**
1216 * @tc.steps: step3. deviceA put {k1, v1}, {k2, v2}
1217 */
1218 ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1219
1220 /**
1221 * @tc.steps: step4. deviceA call sync and wait
1222 * @tc.expected: step4. sync should return OK.
1223 */
1224 std::map<std::string, DBStatus> result;
1225 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1226
1227 /**
1228 * @tc.expected: step5. onComplete should be called, and status is time_out
1229 */
1230 ASSERT_TRUE(result.size() == devices.size());
1231 for (const auto &pair : result) {
1232 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1233 EXPECT_TRUE(pair.second == OK);
1234 }
1235 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1236 }
1237
1238 /**
1239 * @tc.name: SyncRetry004
1240 * @tc.desc: use query sync retry sync use push
1241 * @tc.type: FUNC
1242 * @tc.require: AR000CKRTD AR000CQE0E
1243 * @tc.author: zhuwentao
1244 */
1245 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry004, TestSize.Level3)
1246 {
1247 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1248 std::vector<std::string> devices;
1249 devices.push_back(g_deviceB->GetDeviceId());
1250
1251 /**
1252 * @tc.steps: step1. set sync retry
1253 * @tc.expected: step1, Pragma return OK.
1254 */
1255 int pragmaData = 1;
1256 PragmaData input = static_cast<PragmaData>(&pragmaData);
1257 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1258
1259 /**
1260 * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1261 */
1262 for (int i = 0; i < 5; i++) {
1263 Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 128); // rand num 1024 for test
1264 Value value;
1265 DistributedDBToolsUnitTest::GetRandomKeyValue(value, 256u);
1266 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1267 }
1268
1269 /**
1270 * @tc.steps: step3. deviceA call sync and wait
1271 * @tc.expected: step3. sync should return OK.
1272 */
1273 std::map<std::string, DBStatus> result;
1274 std::vector<uint8_t> prefixKey({'a', 'b'});
1275 Query query = Query::Select().PrefixKey(prefixKey);
1276 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
1277
1278 /**
1279 * @tc.expected: step4. onComplete should be called, and status is time_out
1280 */
1281 ASSERT_TRUE(result.size() == devices.size());
1282 for (const auto &pair : result) {
1283 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1284 EXPECT_TRUE(pair.second == OK);
1285 }
1286 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1287 }
1288
1289 /**
1290 * @tc.name: SyncRetry005
1291 * @tc.desc: use sync retry sync use pull by compress
1292 * @tc.type: FUNC
1293 * @tc.require: AR000CKRTD AR000CQE0E
1294 * @tc.author: zhangqiquan
1295 */
1296 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry005, TestSize.Level3)
1297 {
1298 if (g_kvDelegatePtr != nullptr) {
1299 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1300 g_kvDelegatePtr = nullptr;
1301 }
1302 /**
1303 * @tc.steps: step1. open db use Compress
1304 * @tc.expected: step1, Pragma return OK.
1305 */
1306 KvStoreNbDelegate::Option option;
1307 option.isNeedCompressOnSync = true;
1308 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1309 ASSERT_TRUE(g_kvDelegateStatus == OK);
1310 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1311
1312 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1313 std::vector<std::string> devices;
1314 devices.push_back(g_deviceB->GetDeviceId());
1315
1316 /**
1317 * @tc.steps: step2. set sync retry
1318 * @tc.expected: step2, Pragma return OK.
1319 */
1320 int pragmaData = 1;
1321 PragmaData input = static_cast<PragmaData>(&pragmaData);
1322 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1323
1324 /**
1325 * @tc.steps: step3. deviceA call sync and wait
1326 * @tc.expected: step3. sync should return OK.
1327 */
1328 std::map<std::string, DBStatus> result;
1329 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1330
1331 /**
1332 * @tc.expected: step4. onComplete should be called, and status is time_out
1333 */
1334 ASSERT_TRUE(result.size() == devices.size());
1335 for (const auto &pair : result) {
1336 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1337 EXPECT_EQ(pair.second, OK);
1338 }
1339 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1340 }
1341
1342 /**
1343 * @tc.name: ReSetWatchDogTest001
1344 * @tc.desc: trigger resetWatchDog while pull
1345 * @tc.type: FUNC
1346 * @tc.require: AR000CKRTD AR000CQE0E
1347 * @tc.author: zhuwentao
1348 */
1349 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, ReSetWaterDogTest001, TestSize.Level3)
1350 {
1351 ASSERT_NO_FATAL_FAILURE(ReSetWaterDogTest001());
1352 }
1353
1354 /**
1355 * @tc.name: RebuildSync001
1356 * @tc.desc: rebuild db and sync again
1357 * @tc.type: FUNC
1358 * @tc.require:
1359 * @tc.author: zhuwentao
1360 */
1361 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync001, TestSize.Level3)
1362 {
1363 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1364 /**
1365 * @tc.steps: step1. sync deviceB data to A and check data
1366 * * @tc.expected: step1. interface return ok
1367 */
1368 Key key1 = {'1'};
1369 Key key2 = {'2'};
1370 Value value = {'1'};
1371 Timestamp currentTime;
1372 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1373 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1374 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1375 EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1376 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1377
1378 Value actualValue;
1379 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1380 EXPECT_EQ(actualValue, value);
1381 actualValue.clear();
1382 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1383 EXPECT_EQ(actualValue, value);
1384 /**
1385 * @tc.steps: step2. delete db and rebuild
1386 * * @tc.expected: step2. interface return ok
1387 */
1388 g_mgr.CloseKvStore(g_kvDelegatePtr);
1389 g_kvDelegatePtr = nullptr;
1390 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1391 KvStoreNbDelegate::Option option;
1392 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1393 ASSERT_TRUE(g_kvDelegateStatus == OK);
1394 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1395 /**
1396 * @tc.steps: step3. sync to device A again
1397 * * @tc.expected: step3. sync ok
1398 */
1399 value = {'2'};
1400 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1401 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1402 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1403 /**
1404 * @tc.steps: step4. check data in device A
1405 * * @tc.expected: step4. check ok
1406 */
1407 actualValue.clear();
1408 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1409 EXPECT_EQ(actualValue, value);
1410 }
1411
1412 /**
1413 * @tc.name: RebuildSync002
1414 * @tc.desc: test clear remote data when receive data
1415 * @tc.type: FUNC
1416 * @tc.require:
1417 * @tc.author: zhuwentao
1418 */
1419 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync002, TestSize.Level1)
1420 {
1421 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1422 std::vector<std::string> devices;
1423 devices.push_back(g_deviceB->GetDeviceId());
1424 /**
1425 * @tc.steps: step1. device A SET_WIPE_POLICY
1426 * * @tc.expected: step1. interface return ok
1427 */
1428 int pragmaData = 2; // 2 means enable
1429 PragmaData input = static_cast<PragmaData>(&pragmaData);
1430 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_WIPE_POLICY, input) == OK);
1431 /**
1432 * @tc.steps: step2. sync deviceB data to A and check data
1433 * * @tc.expected: step2. interface return ok
1434 */
1435 Key key1 = {'1'};
1436 Key key2 = {'2'};
1437 Key key3 = {'3'};
1438 Key key4 = {'4'};
1439 Value value = {'1'};
1440 Timestamp currentTime;
1441 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1442 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1443 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1444 EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1445 EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1446 /**
1447 * @tc.steps: step3. deviceA call pull sync
1448 * @tc.expected: step3. sync should return OK.
1449 */
1450 std::map<std::string, DBStatus> result;
1451 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result) == OK);
1452
1453 /**
1454 * @tc.expected: step4. onComplete should be called, check data
1455 */
1456 ASSERT_TRUE(result.size() == devices.size());
1457 for (const auto &pair : result) {
1458 EXPECT_TRUE(pair.second == OK);
1459 }
1460 Value actualValue;
1461 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1462 EXPECT_EQ(actualValue, value);
1463 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1464 EXPECT_EQ(actualValue, value);
1465 /**
1466 * @tc.steps: step5. device B rebuild and put some data
1467 * * @tc.expected: step5. rebuild ok
1468 */
1469 if (g_deviceB != nullptr) {
1470 delete g_deviceB;
1471 g_deviceB = nullptr;
1472 }
1473 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
1474 ASSERT_TRUE(g_deviceB != nullptr);
1475 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
1476 ASSERT_TRUE(syncInterfaceB != nullptr);
1477 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
1478 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1479 EXPECT_EQ(g_deviceB->PutData(key3, value, currentTime, 0), E_OK);
1480 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1481 EXPECT_EQ(g_deviceB->PutData(key4, value, currentTime, 0), E_OK);
1482 /**
1483 * @tc.steps: step6. sync to device A again and check data
1484 * * @tc.expected: step6. sync ok
1485 */
1486 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1487 EXPECT_EQ(g_kvDelegatePtr->Get(key3, actualValue), OK);
1488 EXPECT_EQ(actualValue, value);
1489 EXPECT_EQ(g_kvDelegatePtr->Get(key4, actualValue), OK);
1490 EXPECT_EQ(actualValue, value);
1491 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1492 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
1493 }
1494
1495 /**
1496 * @tc.name: RebuildSync003
1497 * @tc.desc: test clear history data when receive ack
1498 * @tc.type: FUNC
1499 * @tc.require:
1500 * @tc.author: zhuwentao
1501 */
1502 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync003, TestSize.Level1)
1503 {
1504 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1505 /**
1506 * @tc.steps: step1. sync deviceB data to A and check data
1507 * * @tc.expected: step1. interface return ok
1508 */
1509 Key key1 = {'1'};
1510 Key key2 = {'2'};
1511 Key key3 = {'3'};
1512 Key key4 = {'4'};
1513 Value value = {'1'};
1514 EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1515 EXPECT_EQ(g_deviceB->PutData(key2, value, 2u, 0), E_OK); // 2: timestamp
1516 EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1517 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1518 Value actualValue;
1519 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1520 EXPECT_EQ(actualValue, value);
1521 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1522 EXPECT_EQ(actualValue, value);
1523 VirtualDataItem item;
1524 EXPECT_EQ(g_deviceB->GetData(key3, item), E_OK);
1525 EXPECT_EQ(item.value, value);
1526 /**
1527 * @tc.steps: step2. device B sync to device A,but make it failed
1528 * * @tc.expected: step2. interface return ok
1529 */
1530 EXPECT_EQ(g_deviceB->PutData(key4, value, 3u, 0), E_OK); // 3: timestamp
1531 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_A, DATA_SYNC_MESSAGE);
1532 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1533 /**
1534 * @tc.steps: step3. device B set delay send time
1535 * * @tc.expected: step3. interface return ok
1536 */
1537 std::set<std::string> delayDevice = {DEVICE_B};
1538 g_communicatorAggregator->SetSendDelayInfo(3000u, DATA_SYNC_MESSAGE, 1u, 0u, delayDevice); // delay 3000ms one time
1539 /**
1540 * @tc.steps: step4. device A rebuilt, device B push data to A and set clear remote data mark into context after 1s
1541 * * @tc.expected: step4. interface return ok
1542 */
1543 g_deviceB->SetClearRemoteStaleData(true);
1544 g_mgr.CloseKvStore(g_kvDelegatePtr);
1545 g_kvDelegatePtr = nullptr;
1546 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1547 KvStoreNbDelegate::Option option;
1548 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1549 ASSERT_TRUE(g_kvDelegateStatus == OK);
1550 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1551 std::map<std::string, DBStatus> result;
1552 std::vector<std::string> devices = {g_deviceB->GetDeviceId()};
1553 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1554 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1555 /**
1556 * @tc.steps: step5. device B sync to A, make it clear history data and check data
1557 * * @tc.expected: step5. interface return ok
1558 */
1559 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1560 EXPECT_EQ(g_deviceB->GetData(key3, item), -E_NOT_FOUND);
1561 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1562 EXPECT_EQ(actualValue, value);
1563 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1564 EXPECT_EQ(actualValue, value);
1565 g_communicatorAggregator->ResetSendDelayInfo();
1566 }
1567
1568 /**
1569 * @tc.name: RemoveDeviceData001
1570 * @tc.desc: call rekey and removeDeviceData Concurrently
1571 * @tc.type: FUNC
1572 * @tc.require: AR000D487B
1573 * @tc.author: zhuwentao
1574 */
1575 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData001, TestSize.Level1)
1576 {
1577 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1578 /**
1579 * @tc.steps: step1. sync deviceB data to A
1580 * * @tc.expected: step1. interface return ok
1581 */
1582 Key key1 = {'1'};
1583 Key key2 = {'2'};
1584 Value value = {'1'};
1585 g_deviceB->PutData(key1, value, 1, 0);
1586 g_deviceB->PutData(key2, value, 2, 0);
1587 g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true);
1588
1589 Value actualValue;
1590 g_kvDelegatePtr->Get(key1, actualValue);
1591 EXPECT_EQ(actualValue, value);
1592 actualValue.clear();
1593 g_kvDelegatePtr->Get(key2, actualValue);
1594 EXPECT_EQ(actualValue, value);
1595 /**
1596 * @tc.steps: step2. call Rekey and RemoveDeviceData Concurrently
1597 * * @tc.expected: step2. interface return ok
1598 */
__anon7f2dc5a01302() 1599 std::thread thread1([]() {
1600 CipherPassword passwd3;
1601 std::vector<uint8_t> passwdVect = {'p', 's', 'd', 'z'};
1602 passwd3.SetValue(passwdVect.data(), passwdVect.size());
1603 g_kvDelegatePtr->Rekey(passwd3);
1604 });
__anon7f2dc5a01402() 1605 std::thread thread2([]() {
1606 g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
1607 });
1608 thread1.join();
1609 thread2.join();
1610 }
1611
1612 /**
1613 * @tc.name: DeviceOfflineSyncTask001
1614 * @tc.desc: Test sync task when device offline and close db Concurrently
1615 * @tc.type: FUNC
1616 * @tc.require: AR000HI2JS
1617 * @tc.author: zhuwentao
1618 */
1619 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask001, TestSize.Level3)
1620 {
1621 DBStatus status = OK;
1622 std::vector<std::string> devices;
1623 devices.push_back(g_deviceB->GetDeviceId());
1624
1625 /**
1626 * @tc.steps: step1. deviceA put {k1, v1}
1627 */
1628 Key key = {'1'};
1629 Value value = {'1'};
1630 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1631
1632 /**
1633 * @tc.steps: step2. deviceA set auto sync and put some key/value
1634 * @tc.expected: step2. interface should return OK.
1635 */
1636 bool autoSync = true;
1637 PragmaData data = static_cast<PragmaData>(&autoSync);
1638 status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1639 ASSERT_EQ(status, OK);
1640
1641 Key key1 = {'2'};
1642 Key key2 = {'3'};
1643 Key key3 = {'4'};
1644 Key key4 = {'5'};
1645 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1646 ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1647 ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1648 ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1649 ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value) == OK);
1650 /**
1651 * @tc.steps: step3. device offline and close db Concurrently
1652 * @tc.expected: step3. interface should return OK.
1653 */
__anon7f2dc5a01502() 1654 std::thread thread1([]() {
1655 g_mgr.CloseKvStore(g_kvDelegatePtr);
1656 g_kvDelegatePtr = nullptr;
1657 });
__anon7f2dc5a01602() 1658 std::thread thread2([]() {
1659 g_deviceB->Offline();
1660 });
1661 thread1.join();
1662 thread2.join();
1663 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1664 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1665 }
1666
1667 /**
1668 * @tc.name: DeviceOfflineSyncTask002
1669 * @tc.desc: Test sync task when autoSync and close db Concurrently
1670 * @tc.type: FUNC
1671 * @tc.require:
1672 * @tc.author: zhuwentao
1673 */
1674 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask002, TestSize.Level3)
1675 {
1676 DBStatus status = OK;
1677 g_deviceC->Offline();
1678
1679 /**
1680 * @tc.steps: step1. deviceA put {k1, v1}
1681 */
1682 Key key = {'1'};
1683 Value value = {'1'};
1684 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1685
1686 /**
1687 * @tc.steps: step2. deviceA set auto sync and put some key/value
1688 * @tc.expected: step2. interface should return OK.
1689 */
1690 bool autoSync = true;
1691 PragmaData data = static_cast<PragmaData>(&autoSync);
1692 status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1693 ASSERT_EQ(status, OK);
1694 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME * 2));
1695
1696 Key key1 = {'2'};
1697 Key key2 = {'3'};
1698 Key key3 = {'4'};
1699 ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1700 ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1701 ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1702 /**
1703 * @tc.steps: step3. close db
1704 * @tc.expected: step3. interface should return OK.
1705 */
1706 g_mgr.CloseKvStore(g_kvDelegatePtr);
1707 g_kvDelegatePtr = nullptr;
1708 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1709 }
1710
1711 /**
1712 * @tc.name: DeviceOfflineSyncTask003
1713 * @tc.desc: Test sync task when device offline after call sync
1714 * @tc.type: FUNC
1715 * @tc.require:
1716 * @tc.author: zhuwentao
1717 */
1718 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask003, TestSize.Level3)
1719 {
1720 std::vector<std::string> devices;
1721 devices.push_back(g_deviceB->GetDeviceId());
1722
1723 /**
1724 * @tc.steps: step1. deviceA put {k1, v1}
1725 */
1726 Key key = {'1'};
1727 Value value = {'1'};
1728 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1729 /**
1730 * @tc.steps: step2. device offline after call sync
1731 * @tc.expected: step2. interface should return OK.
1732 */
1733 Query query = Query::Select().PrefixKey(key);
1734 ASSERT_TRUE(g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, query, false) == OK);
1735 std::this_thread::sleep_for(std::chrono::milliseconds(15)); // wait for 15ms
1736 g_deviceB->Offline();
1737 }
1738
1739 /**
1740 * @tc.name: GetSyncDataFail001
1741 * @tc.desc: test get sync data failed when sync
1742 * @tc.type: FUNC
1743 * @tc.require:
1744 * @tc.author: zhuwentao
1745 */
1746 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail001, TestSize.Level1)
1747 {
1748 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1749 /**
1750 * @tc.steps: step1. device B set get data errCode control and put some data
1751 * * @tc.expected: step1. interface return ok
1752 */
1753 g_deviceB->SetGetDataErrCode(1, -E_BUSY, true);
1754 Key key1 = {'1'};
1755 Value value = {'1'};
1756 EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1757 /**
1758 * @tc.steps: step2. device B sync to device A and check data
1759 * * @tc.expected: step2. interface return ok
1760 */
1761 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1762 Value actualValue;
1763 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1764 g_deviceB->ResetDataControl();
1765 }
1766
1767 /**
1768 * @tc.name: GetSyncDataFail002
1769 * @tc.desc: test get sync data failed when sync with large data
1770 * @tc.type: FUNC
1771 * @tc.require: AR000D487B
1772 * @tc.author: zhuwentao
1773 */
1774 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail002, TestSize.Level1)
1775 {
1776 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1777 /**
1778 * @tc.steps: step1. device B set get data errCode control and put some data
1779 * * @tc.expected: step1. interface return ok
1780 */
1781 g_deviceB->SetGetDataErrCode(2, -E_BUSY, true);
1782 int totalSize = 4000u;
1783 std::vector<Entry> entries;
1784 std::vector<Key> keys;
1785 const int keyLen = 10; // 20 Bytes
1786 const int valueLen = 10; // 20 Bytes
1787 DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1788 uint32_t i = 1u;
1789 for (const auto &entry : entries) {
1790 EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1791 i++;
1792 }
1793 /**
1794 * @tc.steps: step2. device B sync to device A and check data
1795 * * @tc.expected: step2. interface return ok
1796 */
1797 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1798 std::this_thread::sleep_for(std::chrono::seconds(1));
1799 Value actualValue;
1800 for (int j = 1u; j <= totalSize; j++) {
1801 if (j > totalSize / 2) {
1802 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1803 } else {
1804 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1805 }
1806 }
1807 g_deviceB->ResetDataControl();
1808 }
1809
1810 /**
1811 * @tc.name: GetSyncDataFail003
1812 * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1813 * @tc.type: FUNC
1814 * @tc.require:
1815 * @tc.author: zhuwentao
1816 */
1817 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail003, TestSize.Level1)
1818 {
1819 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1820 /**
1821 * @tc.steps: step1. device B set get data errCode control and put some data
1822 * * @tc.expected: step1. interface return ok
1823 */
1824 g_deviceB->SetGetDataErrCode(1, -E_EKEYREVOKED, true);
1825 Key key1 = {'1'};
1826 Key key2 = {'3'};
1827 Value value = {'1'};
1828 EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1829 EXPECT_EQ(g_kvDelegatePtr->Put(key2, value), OK);
1830 /**
1831 * @tc.steps: step2. device B sync to device A and check data
1832 * * @tc.expected: step2. interface return ok
1833 */
1834 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1835 Value actualValue;
1836 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1837 VirtualDataItem item;
1838 EXPECT_EQ(g_deviceB->GetData(key2, item), E_OK);
1839 g_deviceB->ResetDataControl();
1840 }
1841
1842 /**
1843 * @tc.name: GetSyncDataFail004
1844 * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1845 * @tc.type: FUNC
1846 * @tc.require:
1847 * @tc.author: zhuwentao
1848 */
1849 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail004, TestSize.Level1)
1850 {
1851 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1852 /**
1853 * @tc.steps: step1. device B set get data errCode control and put some data
1854 * * @tc.expected: step1. interface return ok
1855 */
1856 g_deviceB->SetGetDataErrCode(2, -E_EKEYREVOKED, true);
1857 int totalSize = 4000u;
1858 std::vector<Entry> entries;
1859 std::vector<Key> keys;
1860 const int keyLen = 10; // 20 Bytes
1861 const int valueLen = 10; // 20 Bytes
1862 DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1863 uint32_t i = 1u;
1864 for (const auto &entry : entries) {
1865 EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1866 i++;
1867 }
1868 Key key = {'a', 'b', 'c'};
1869 Value value = {'1'};
1870 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1871 /**
1872 * @tc.steps: step2. device B sync to device A and check data
1873 * * @tc.expected: step2. interface return ok
1874 */
1875 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1876 std::this_thread::sleep_for(std::chrono::seconds(1));
1877 Value actualValue;
1878 for (int j = 1u; j <= totalSize; j++) {
1879 if (j > totalSize / 2) {
1880 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1881 } else {
1882 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1883 }
1884 }
1885 VirtualDataItem item;
1886 EXPECT_EQ(g_deviceB->GetData(key, item), E_OK);
1887 g_deviceB->ResetDataControl();
1888 }
1889
1890 /**
1891 * @tc.name: InterceptDataFail001
1892 * @tc.desc: test intercept data failed when sync
1893 * @tc.type: FUNC
1894 * @tc.require:
1895 * @tc.author: zhuwentao
1896 */
1897 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptDataFail001, TestSize.Level1)
1898 {
1899 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1900 /**
1901 * @tc.steps: step1. device A set intercept data errCode and put some data
1902 * * @tc.expected: step1. interface return ok
1903 */
1904 g_kvDelegatePtr->SetPushDataInterceptor(
__anon7f2dc5a01702(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 1905 [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
1906 int errCode = OK;
1907 auto entries = data.GetEntries();
1908 LOGD("====here111,size=%d", entries.size());
1909 for (size_t i = 0; i < entries.size(); i++) {
1910 Key newKey;
1911 errCode = data.ModifyKey(i, newKey);
1912 if (errCode != OK) {
1913 break;
1914 }
1915 }
1916 return errCode;
1917 }
1918 );
1919 Key key = {'1'};
1920 Value value = {'1'};
1921 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1922 /**
1923 * @tc.steps: step2. device A sync to device B and check data
1924 * * @tc.expected: step2. interface return ok
1925 */
1926 std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
1927 std::map<std::string, DBStatus> result;
1928 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1929 ASSERT_TRUE(result.size() == devices.size());
1930 for (const auto &pair : result) {
1931 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1932 EXPECT_TRUE(pair.second == INTERCEPT_DATA_FAIL);
1933 }
1934 VirtualDataItem item;
1935 EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
1936 }
1937
1938 /**
1939 * @tc.name: UpdateKey001
1940 * @tc.desc: test update key can effect local data and sync data, without delete data
1941 * @tc.type: FUNC
1942 * @tc.require:
1943 * @tc.author: zhangqiquan
1944 */
1945 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, UpdateKey001, TestSize.Level1)
1946 {
1947 /**
1948 * @tc.steps: step1. device A set sync data (k1, v1) local data (k2, v2) (k3, v3) and delete (k4, v4)
1949 * @tc.expected: step1. put data return ok
1950 */
1951 Key k1 = {'k', '1'};
1952 Value v1 = {'v', '1'};
1953 g_deviceB->PutData(k1, v1, 1, 0);
1954 ASSERT_EQ(g_deviceB->Sync(SyncMode::SYNC_MODE_PUSH_ONLY, true), E_OK);
1955 Value actualValue;
1956 EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
1957 EXPECT_EQ(v1, actualValue);
1958 Key k2 = {'k', '2'};
1959 Value v2 = {'v', '2'};
1960 Key k3 = {'k', '3'};
1961 Value v3 = {'v', '3'};
1962 Key k4 = {'k', '4'};
1963 Value v4 = {'v', '4'};
1964 EXPECT_EQ(g_kvDelegatePtr->Put(k2, v2), OK);
1965 EXPECT_EQ(g_kvDelegatePtr->Put(k3, v3), OK);
1966 EXPECT_EQ(g_kvDelegatePtr->Put(k4, v4), OK);
1967 EXPECT_EQ(g_kvDelegatePtr->Delete(k4), OK);
1968 /**
1969 * @tc.steps: step2. device A update key and set
1970 * @tc.expected: step2. put data return ok
1971 */
__anon7f2dc5a01802(const Key &originKey, Key &newKey) 1972 DBStatus status = g_kvDelegatePtr->UpdateKey([](const Key &originKey, Key &newKey) {
1973 newKey = originKey;
1974 newKey.push_back('0');
1975 });
1976 EXPECT_EQ(status, OK);
1977 k1.push_back('0');
1978 k2.push_back('0');
1979 k3.push_back('0');
1980 EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
1981 EXPECT_EQ(v1, actualValue);
1982 EXPECT_EQ(g_kvDelegatePtr->Get(k2, actualValue), OK);
1983 EXPECT_EQ(v2, actualValue);
1984 EXPECT_EQ(g_kvDelegatePtr->Get(k3, actualValue), OK);
1985 EXPECT_EQ(v3, actualValue);
1986 }
1987
1988 /**
1989 * @tc.name: MetaBusy001
1990 * @tc.desc: test sync normal when update water mark busy
1991 * @tc.type: FUNC
1992 * @tc.require:
1993 * @tc.author: zhangqiquan
1994 */
1995 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, MetaBusy001, TestSize.Level1)
1996 {
1997 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1998 Key key = {'1'};
1999 Value value = {'1'};
2000 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
2001 std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2002 std::map<std::string, DBStatus> result;
2003 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2004 ASSERT_EQ(result.size(), devices.size());
2005 for (const auto &pair : result) {
2006 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2007 EXPECT_TRUE(pair.second == OK);
2008 }
2009 value = {'2'};
2010 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
__anon7f2dc5a01902() 2011 g_deviceB->SetSaveDataCallback([] () {
2012 RuntimeContext::GetInstance()->ScheduleTask([]() {
2013 g_deviceB->EraseWaterMark("real_device");
2014 });
2015 std::this_thread::sleep_for(std::chrono::seconds(1));
2016 });
2017 EXPECT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2018 EXPECT_EQ(result.size(), devices.size());
2019 for (const auto &pair : result) {
2020 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2021 EXPECT_TRUE(pair.second == OK);
2022 }
2023 g_deviceB->SetSaveDataCallback(nullptr);
2024 RuntimeContext::GetInstance()->StopTaskPool();
2025 }