• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <condition_variable>
17 #include <gtest/gtest.h>
18 #include <thread>
19 
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "mock_sync_task_context.h"
27 #include "platform_specific.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_kv_sync_task_context.h"
30 
31 using namespace testing::ext;
32 using namespace DistributedDB;
33 using namespace DistributedDBUnitTest;
34 using namespace std;
35 
36 namespace {
37     class TestSingleVerKvSyncTaskContext : public SingleVerKvSyncTaskContext {
38     public:
39         TestSingleVerKvSyncTaskContext() = default;
40     };
41     string g_testDir;
42     const string STORE_ID = "kv_stroe_complex_sync_test";
43     const int WAIT_TIME = 1000;
44     const std::string DEVICE_A = "real_device";
45     const std::string DEVICE_B = "deviceB";
46     const std::string DEVICE_C = "deviceC";
47     const std::string CREATE_SYNC_TABLE_SQL =
48     "CREATE TABLE IF NOT EXISTS sync_data(" \
49         "key         BLOB NOT NULL," \
50         "value       BLOB," \
51         "timestamp   INT  NOT NULL," \
52         "flag        INT  NOT NULL," \
53         "device      BLOB," \
54         "ori_device  BLOB," \
55         "hash_key    BLOB PRIMARY KEY NOT NULL," \
56         "w_timestamp INT," \
57         "modify_time INT," \
58         "create_time INT" \
59         ");";
60 
61     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
62     KvStoreConfig g_config;
63     DistributedDBToolsUnitTest g_tool;
64     DBStatus g_kvDelegateStatus = INVALID_ARGS;
65     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
66     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
67     KvVirtualDevice *g_deviceB = nullptr;
68     KvVirtualDevice *g_deviceC = nullptr;
69 
70     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
71     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
72         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
73 
PullSyncTest()74     void PullSyncTest()
75     {
76         DBStatus status = OK;
77         std::vector<std::string> devices;
78         devices.push_back(g_deviceB->GetDeviceId());
79 
80         Key key = {'1'};
81         Key key2 = {'2'};
82         Value value = {'1'};
83         g_deviceB->PutData(key, value, 0, 0);
84         g_deviceB->PutData(key2, value, 1, 0);
85 
86         std::map<std::string, DBStatus> result;
87         status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
88         ASSERT_TRUE(status == OK);
89 
90         ASSERT_TRUE(result.size() == devices.size());
91         for (const auto &pair : result) {
92             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
93             EXPECT_TRUE(pair.second == OK);
94         }
95         Value value3;
96         EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
97         EXPECT_EQ(value3, value);
98         EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
99         EXPECT_EQ(value3, value);
100     }
101 
CrudTest()102     void CrudTest()
103     {
104         vector<Entry> entries;
105         int totalSize = 10;
106         for (int i = 0; i < totalSize; i++) {
107             Entry entry;
108             entry.key.push_back(i);
109             entry.value.push_back('2');
110             entries.push_back(entry);
111         }
112         EXPECT_TRUE(g_kvDelegatePtr->PutBatch(entries) == OK);
113         for (const auto &entry : entries) {
114             Value resultvalue;
115             EXPECT_TRUE(g_kvDelegatePtr->Get(entry.key, resultvalue) == OK);
116             EXPECT_TRUE(resultvalue == entry.value);
117         }
118         for (int i = 0; i < totalSize / 2; i++) { // 2: Half of the total
119             g_kvDelegatePtr->Delete(entries[i].key);
120             Value resultvalue;
121             EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == NOT_FOUND);
122         }
123         for (int i = totalSize / 2; i < totalSize; i++) {
124             Value value = entries[i].value;
125             value.push_back('x');
126             EXPECT_TRUE(g_kvDelegatePtr->Put(entries[i].key, value) == OK);
127             Value resultvalue;
128             EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == OK);
129             EXPECT_TRUE(resultvalue == value);
130         }
131     }
132 
DataSync005()133     void DataSync005()
134     {
135         ASSERT_NE(g_communicatorAggregator, nullptr);
136         SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
137         ASSERT_TRUE(dataSync != nullptr);
138         dataSync->SendSaveDataNotifyPacket(nullptr, 0, 0, 0, TIME_SYNC_MESSAGE);
139         EXPECT_EQ(g_communicatorAggregator->GetOnlineDevices().size(), 3u); // 3 online dev
140         delete dataSync;
141     }
142 
DataSync008()143     void DataSync008()
144     {
145         SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
146         ASSERT_TRUE(dataSync != nullptr);
147         auto context = new (std::nothrow) MockSyncTaskContext();
148         dataSync->PutDataMsg(nullptr);
149         bool isNeedHandle = false;
150         bool isContinue = false;
151         EXPECT_EQ(dataSync->MoveNextDataMsg(context, isNeedHandle, isContinue), nullptr);
152         EXPECT_EQ(isNeedHandle, false);
153         EXPECT_EQ(isContinue, false);
154         delete dataSync;
155         delete context;
156     }
157 
ReSetWaterDogTest001()158     void ReSetWaterDogTest001()
159     {
160         /**
161          * @tc.steps: step1. put 10 key/value
162          * @tc.expected: step1, put return OK.
163          */
164         for (int i = 0; i < 5; i++) { // put 5 key
165             Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 1024); // rand num 1024 for test
166             Value value;
167             DistributedDBToolsUnitTest::GetRandomKeyValue(value, 10 * 50 * 1024u); // 10 * 50 * 1024 = 500k
168             EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
169         }
170         /**
171          * @tc.steps: step2. SetDeviceMtuSize
172          * @tc.expected: step2, return OK.
173          */
174         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 50 * 1024u); // 50 * 1024u = 50k
175         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 50 * 1024u); // 50 * 1024u = 50k
176         /**
177          * @tc.steps: step3. deviceA,deviceB sync to each other at same time
178          * @tc.expected: step3. sync should return OK.
179          */
180         EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true), E_OK);
181         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
182         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
183     }
184 }
185 
186 class DistributedDBSingleVerP2PComplexSyncTest : public testing::Test {
187 public:
188     static void SetUpTestCase(void);
189     static void TearDownTestCase(void);
190     void SetUp();
191     void TearDown();
192 };
193 
SetUpTestCase(void)194 void DistributedDBSingleVerP2PComplexSyncTest::SetUpTestCase(void)
195 {
196     /**
197      * @tc.setup: Init datadir and Virtual Communicator.
198      */
199     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
200     g_config.dataDir = g_testDir;
201     g_mgr.SetKvStoreConfig(g_config);
202 
203     string dir = g_testDir + "/single_ver";
204     DIR* dirTmp = opendir(dir.c_str());
205     if (dirTmp == nullptr) {
206         OS::MakeDBDirectory(dir);
207     } else {
208         closedir(dirTmp);
209     }
210 
211     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
212     ASSERT_TRUE(g_communicatorAggregator != nullptr);
213     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
214 }
215 
TearDownTestCase(void)216 void DistributedDBSingleVerP2PComplexSyncTest::TearDownTestCase(void)
217 {
218     /**
219      * @tc.teardown: Release virtual Communicator and clear data dir.
220      */
221     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
222         LOGE("rm test db files error!");
223     }
224     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
225 }
226 
SetUp(void)227 void DistributedDBSingleVerP2PComplexSyncTest::SetUp(void)
228 {
229     DistributedDBToolsUnitTest::PrintTestCaseInfo();
230     /**
231      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
232      */
233     KvStoreNbDelegate::Option option;
234     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
235     ASSERT_TRUE(g_kvDelegateStatus == OK);
236     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
237     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
238     ASSERT_TRUE(g_deviceB != nullptr);
239     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
240     ASSERT_TRUE(syncInterfaceB != nullptr);
241     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
242 
243     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
244     ASSERT_TRUE(g_deviceC != nullptr);
245     VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
246     ASSERT_TRUE(syncInterfaceC != nullptr);
247     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
248 
249     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
250         const std::string &deviceId, uint8_t flag) -> bool {
251             return true;
252         };
253     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
254 }
255 
TearDown(void)256 void DistributedDBSingleVerP2PComplexSyncTest::TearDown(void)
257 {
258     /**
259      * @tc.teardown: Release device A, B, C
260      */
261     if (g_kvDelegatePtr != nullptr) {
262         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
263         g_kvDelegatePtr = nullptr;
264         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
265         LOGD("delete kv store status %d", status);
266         ASSERT_TRUE(status == OK);
267     }
268     if (g_deviceB != nullptr) {
269         delete g_deviceB;
270         g_deviceB = nullptr;
271     }
272     if (g_deviceC != nullptr) {
273         delete g_deviceC;
274         g_deviceC = nullptr;
275     }
276     PermissionCheckCallbackV2 nullCallback;
277     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
278 }
279 
280 /**
281   * @tc.name: SaveDataNotify001
282   * @tc.desc: Test SaveDataNotify function, delay < 30s should sync ok, > 36 should timeout
283   * @tc.type: FUNC
284   * @tc.require: AR000D4876
285   * @tc.author: xushaohua
286   */
287 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SaveDataNotify001, TestSize.Level3)
288 {
289     DBStatus status = OK;
290     const int waitFiveSeconds = 5000;
291     const int waitThirtySeconds = 30000;
292     const int waitThirtySixSeconds = 36000;
293     std::vector<std::string> devices;
294     devices.push_back(g_deviceB->GetDeviceId());
295 
296     /**
297      * @tc.steps: step1. deviceA put {k1, v1}
298      */
299     Key key = {'1'};
300     Value value = {'1'};
301     status = g_kvDelegatePtr->Put(key, value);
302     ASSERT_TRUE(status == OK);
303 
304     /**
305      * @tc.steps: step2. deviceB set sava data dely 5s
306      */
307     g_deviceB->SetSaveDataDelayTime(waitFiveSeconds);
308 
309     /**
310      * @tc.steps: step3. deviceA call sync and wait
311      * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
312      */
313     std::map<std::string, DBStatus> result;
314     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
315     ASSERT_TRUE(status == OK);
316     ASSERT_TRUE(result.size() == devices.size());
317     ASSERT_TRUE(result[DEVICE_B] == OK);
318 
319     /**
320      * @tc.steps: step4. deviceB set sava data dely 30s and put {k1, v1}
321      */
322     g_deviceB->SetSaveDataDelayTime(waitThirtySeconds);
323     status = g_kvDelegatePtr->Put(key, value);
324     ASSERT_TRUE(status == OK);
325      /**
326      * @tc.steps: step3. deviceA call sync and wait
327      * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
328      */
329     result.clear();
330     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
331     ASSERT_TRUE(status == OK);
332     ASSERT_TRUE(result.size() == devices.size());
333     ASSERT_TRUE(result[DEVICE_B] == OK);
334 
335     /**
336      * @tc.steps: step4. deviceB set sava data dely 36s and put {k1, v1}
337      */
338     g_deviceB->SetSaveDataDelayTime(waitThirtySixSeconds);
339     status = g_kvDelegatePtr->Put(key, value);
340     ASSERT_TRUE(status == OK);
341     /**
342      * @tc.steps: step5. deviceA call sync and wait
343      * @tc.expected: step5. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
344      */
345     result.clear();
346     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
347     ASSERT_TRUE(status == OK);
348     ASSERT_TRUE(result.size() == devices.size());
349     ASSERT_TRUE(result[DEVICE_B] == TIME_OUT);
350 }
351 
352 /**
353   * @tc.name: SametimeSync001
354   * @tc.desc: Test 2 device sync with each other
355   * @tc.type: FUNC
356   * @tc.require: AR000CCPOM
357   * @tc.author: zhangqiquan
358   */
359 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync001, TestSize.Level3)
360 {
361     DBStatus status = OK;
362     std::vector<std::string> devices;
363     devices.push_back(g_deviceB->GetDeviceId());
364 
365     int responseCount = 0;
366     int requestCount = 0;
367     Key key = {'1'};
368     Value value = {'1'};
369     /**
370      * @tc.steps: step1. make sure deviceB send pull firstly and response_pull secondly
371      * @tc.expected: step1. deviceA put data when finish push task. put data should return OK.
372      */
373     g_communicatorAggregator->RegOnDispatch([&responseCount, &requestCount, &key, &value](
__anon61e499a10302( const std::string &target, DistributedDB::Message *msg) 374         const std::string &target, DistributedDB::Message *msg) {
375         if (target != "real_device" || msg->GetMessageId() != DATA_SYNC_MESSAGE) {
376             return;
377         }
378 
379         if (msg->GetMessageType() == TYPE_RESPONSE) {
380             responseCount++;
381             if (responseCount == 1) { // 1 is the ack which B response A's push task
382                 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), DBStatus::OK);
383                 std::this_thread::sleep_for(std::chrono::seconds(1));
384             } else if (responseCount == 2) { // 2 is the ack which B response A's response_pull task
385                 msg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
386             }
387         } else if (msg->GetMessageType() == TYPE_REQUEST) {
388             requestCount++;
389             if (requestCount == 1) { // 1 is A push task
390                 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2 sec
391             }
392         }
393     });
394     /**
395      * @tc.steps: step2. deviceA,deviceB sync to each other at same time
396      * @tc.expected: step2. sync should return OK.
397      */
398     std::map<std::string, DBStatus> result;
__anon61e499a10402null399     std::thread subThread([]{
400         g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true);
401     });
402     status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_PULL, result);
403     subThread.join();
404     g_communicatorAggregator->RegOnDispatch(nullptr);
405 
406     EXPECT_TRUE(status == OK);
407     ASSERT_TRUE(result.size() == devices.size());
408     EXPECT_TRUE(result[DEVICE_B] == OK);
409     Value actualValue;
410     g_kvDelegatePtr->Get(key, actualValue);
411     EXPECT_EQ(actualValue, value);
412 }
413 
414 /**
415   * @tc.name: SametimeSync002
416   * @tc.desc: Test 2 device sync with each other with water error
417   * @tc.type: FUNC
418   * @tc.require: AR000CCPOM
419   * @tc.author: zhangqiquan
420   */
421 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync002, TestSize.Level3)
422 {
423     DBStatus status = OK;
424     std::vector<std::string> devices;
425     devices.push_back(g_deviceB->GetDeviceId());
426     g_kvDelegatePtr->Put({'k', '1'}, {'v', '1'});
427     /**
428      * @tc.steps: step1. make sure deviceA push data failed and increase water mark
429      * @tc.expected: step1. deviceA push failed with timeout
430      */
__anon61e499a10502(const std::string &target, DistributedDB::Message *msg) 431     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
432         ASSERT_NE(msg, nullptr);
433         if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
434             msg->SetMessageId(INVALID_MESSAGE_ID);
435         }
436     });
437     std::map<std::string, DBStatus> result;
__anon61e499a10602(const std::map<std::string, DBStatus> &map) 438     auto callback = [&result](const std::map<std::string, DBStatus> &map) {
439         result = map;
440     };
441     Query query = Query::Select().PrefixKey({'k', '1'});
442     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
443     ASSERT_TRUE(result.size() == devices.size());
444     EXPECT_TRUE(result[DEVICE_B] == TIME_OUT);
445     /**
446      * @tc.steps: step2. A push to B with query2, sleep 1s for waiting step3
447      * @tc.expected: step2. sync should return OK.
448      */
__anon61e499a10702(const std::string &target, DistributedDB::Message *msg) 449     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
450         ASSERT_NE(msg, nullptr);
451         if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
452             std::this_thread::sleep_for(std::chrono::seconds(1));
453         }
454     });
__anon61e499a10802null455     std::thread subThread([&devices] {
456         std::map<std::string, DBStatus> result;
457         auto callback = [&result](const std::map<std::string, DBStatus> &map) {
458             result = map;
459         };
460         Query query = Query::Select().PrefixKey({'k', '2'});
461         LOGD("Begin PUSH");
462         EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
463         ASSERT_TRUE(result.size() == devices.size());
464         EXPECT_TRUE(result[DEVICE_A] == OK);
465     });
466     /**
467      * @tc.steps: step3. B pull to A when A is in push task
468      * @tc.expected: step3. sync should return OP_FINISHED_ALL.
469      */
470     std::this_thread::sleep_for(std::chrono::milliseconds(100));
471     std::map<std::string, int> virtualResult;
472     g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, query,
__anon61e499a10a02(const std::map<std::string, int> &map) 473         [&virtualResult](const std::map<std::string, int> &map) {
474             virtualResult = map;
475         }, true);
476     EXPECT_TRUE(status == OK);
477     ASSERT_EQ(virtualResult.size(), devices.size());
478     EXPECT_EQ(virtualResult[DEVICE_A], SyncOperation::OP_FINISHED_ALL);
479     g_communicatorAggregator->RegOnDispatch(nullptr);
480     subThread.join();
481 }
482 
483 /**
484  * @tc.name: DatabaseOnlineCallback001
485  * @tc.desc: check database status notify online callback
486  * @tc.type: FUNC
487  * @tc.require: AR000CQS3S SR000CQE0B
488  * @tc.author: zhuwentao
489  */
490 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOnlineCallback001, TestSize.Level1)
491 {
492     /**
493      * @tc.steps: step1. SetStoreStatusNotifier
494      * @tc.expected: step1. SetStoreStatusNotifier ok
495      */
496     std::string targetDev = "DEVICE_X";
497     bool isCheckOk = false;
498     auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anon61e499a10b02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 499         const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
500         if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
501             onlineStatus == true) {
502             isCheckOk = true;
503         }};
504     g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
505     /**
506      * @tc.steps: step2. trigger device online
507      * @tc.expected: step2. check callback ok
508      */
509     g_communicatorAggregator->OnlineDevice(targetDev);
510     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
511     EXPECT_EQ(isCheckOk, true);
512     StoreStatusNotifier nullCallback;
513     g_mgr.SetStoreStatusNotifier(nullCallback);
514 }
515 
516 /**
517  * @tc.name: DatabaseOfflineCallback001
518  * @tc.desc: check database status notify online callback
519  * @tc.type: FUNC
520  * @tc.require: AR000CQS3S SR000CQE0B
521  * @tc.author: zhuwentao
522  */
523 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOfflineCallback001, TestSize.Level1)
524 {
525     /**
526      * @tc.steps: step1. SetStoreStatusNotifier
527      * @tc.expected: step1. SetStoreStatusNotifier ok
528      */
529     std::string targetDev = "DEVICE_X";
530     bool isCheckOk = false;
531     auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anon61e499a10c02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 532         const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
533         if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
534             onlineStatus == false) {
535             isCheckOk = true;
536         }};
537     g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
538     /**
539      * @tc.steps: step2. trigger device offline
540      * @tc.expected: step2. check callback ok
541      */
542     g_communicatorAggregator->OfflineDevice(targetDev);
543     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
544     EXPECT_EQ(isCheckOk, true);
545     StoreStatusNotifier nullCallback;
546     g_mgr.SetStoreStatusNotifier(nullCallback);
547 }
548 
549 /**
550   * @tc.name: CloseSync001
551   * @tc.desc: Test 2 delegate close when sync
552   * @tc.type: FUNC
553   * @tc.require: AR000CCPOM
554   * @tc.author: zhangqiquan
555   */
556 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync001, TestSize.Level3)
557 {
558     DBStatus status = OK;
559     std::vector<std::string> devices;
560     devices.push_back(g_deviceB->GetDeviceId());
561 
562     /**
563      * @tc.steps: step1. make sure A sync start
564      */
565     bool sleep = false;
__anon61e499a10d02(const std::string &target, DistributedDB::Message *msg) 566     g_communicatorAggregator->RegOnDispatch([&sleep](const std::string &target, DistributedDB::Message *msg) {
567         if (!sleep) {
568             sleep = true;
569             std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s for waiting close db
570         }
571     });
572 
573     KvStoreNbDelegate* kvDelegatePtrA = nullptr;
574     KvStoreNbDelegate::Option option;
__anon61e499a10e02(DBStatus s, KvStoreNbDelegate *delegate) 575     g_mgr.GetKvStore(STORE_ID, option, [&status, &kvDelegatePtrA](DBStatus s, KvStoreNbDelegate *delegate) {
576         status = s;
577         kvDelegatePtrA = delegate;
578     });
579     EXPECT_EQ(status, OK);
580     EXPECT_NE(kvDelegatePtrA, nullptr);
581 
582     Key key = {'k'};
583     Value value = {'v'};
584     kvDelegatePtrA->Put(key, value);
585     std::map<std::string, DBStatus> result;
__anon61e499a10f02(const std::map<std::string, DBStatus>& statusMap) 586     auto callback = [&result](const std::map<std::string, DBStatus>& statusMap) {
587         result = statusMap;
588     };
589     /**
590      * @tc.steps: step2. deviceA sync and then close
591      * @tc.expected: step2. sync should abort and data don't exist in B
592      */
__anon61e499a11002() 593     std::thread closeThread([&kvDelegatePtrA]() {
594         std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for waiting sync start
595         EXPECT_EQ(g_mgr.CloseKvStore(kvDelegatePtrA), OK);
596     });
597     EXPECT_EQ(kvDelegatePtrA->Sync(devices, SYNC_MODE_PUSH_ONLY, callback, false), OK);
598     LOGD("Sync finish");
599     closeThread.join();
600     std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s for waiting sync finish
601     EXPECT_EQ(result.size(), 0u);
602     VirtualDataItem actualValue;
603     EXPECT_EQ(g_deviceB->GetData(key, actualValue), -E_NOT_FOUND);
604     g_communicatorAggregator->RegOnDispatch(nullptr);
605 }
606 
607 /**
608   * @tc.name: CloseSync002
609   * @tc.desc: Test 1 delegate close when in time sync
610   * @tc.type: FUNC
611   * @tc.require: AR000CCPOM
612   * @tc.author: zhangqiquan
613   */
614 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync002, TestSize.Level3)
615 {
616     /**
617      * @tc.steps: step1. invalid time sync packet from A
618      */
__anon61e499a11102(const std::string &target, DistributedDB::Message *msg) 619     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
620         ASSERT_NE(msg, nullptr);
621         if (target == DEVICE_B && msg->GetMessageId() == TIME_SYNC_MESSAGE && msg->GetMessageType() == TYPE_REQUEST) {
622             msg->SetMessageId(INVALID_MESSAGE_ID);
623             LOGD("Message is invalid");
624         }
625     });
626     Timestamp currentTime;
627     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
628     g_deviceB->PutData({'k'}, {'v'}, currentTime, 0);
629 
630     /**
631      * @tc.steps: step2. B PUSH to A and A close after 1s
632      * @tc.expected: step2. A closing time cost letter than 4s
633      */
__anon61e499a11202() 634     std::thread closingThread([]() {
635         std::this_thread::sleep_for(std::chrono::seconds(1));
636         LOGD("Begin Close");
637         Timestamp beginTime;
638         (void)OS::GetCurrentSysTimeInMicrosecond(beginTime);
639         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
640         Timestamp endTime;
641         (void)OS::GetCurrentSysTimeInMicrosecond(endTime);
642         EXPECT_LE(static_cast<int>(endTime - beginTime), 4 * 1000 * 1000); // waiting 4 * 1000 * 1000 us
643         LOGD("End Close");
644     });
645     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
646     closingThread.join();
647 
648     /**
649      * @tc.steps: step3. remove db
650      * @tc.expected: step3. remove ok
651      */
652     g_kvDelegatePtr = nullptr;
653     DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
654     LOGD("delete kv store status %d", status);
655     ASSERT_TRUE(status == OK);
656     g_communicatorAggregator->RegOnDispatch(nullptr);
657 }
658 
659 /**
660   * @tc.name: OrderbyWriteTimeSync001
661   * @tc.desc: sync query with order by writeTime
662   * @tc.type: FUNC
663   * @tc.require: AR000H5VLO
664   * @tc.author: zhuwentao
665   */
666 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, OrderbyWriteTimeSync001, TestSize.Level0)
667 {
668     /**
669      * @tc.steps: step1. deviceA subscribe query with order by write time
670      * * @tc.expected: step1. interface return not support
671     */
672     std::vector<std::string> devices;
673     devices.push_back(g_deviceB->GetDeviceId());
674     Query query = Query::Select().PrefixKey({'k'}).OrderByWriteTime(true);
675     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, nullptr, query, true), NOT_SUPPORT);
676 }
677 
678 
679 /**
680  * @tc.name: Device Offline Sync 001
681  * @tc.desc: Test push sync when device offline
682  * @tc.type: FUNC
683  * @tc.require: AR000CCPOM
684  * @tc.author: xushaohua
685  */
686 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync001, TestSize.Level1)
687 {
688     std::vector<std::string> devices;
689     devices.push_back(g_deviceB->GetDeviceId());
690     devices.push_back(g_deviceC->GetDeviceId());
691 
692     /**
693      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2}, {k3 delete}, {k4,v2}
694      */
695     Key key1 = {'1'};
696     Value value1 = {'1'};
697     ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value1) == OK);
698 
699     Key key2 = {'2'};
700     Value value2 = {'2'};
701     ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value2) == OK);
702 
703     Key key3 = {'3'};
704     Value value3 = {'3'};
705     ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value3) == OK);
706     ASSERT_TRUE(g_kvDelegatePtr->Delete(key3) == OK);
707 
708     Key key4 = {'4'};
709     Value value4 = {'4'};
710     ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value4) == OK);
711 
712     /**
713      * @tc.steps: step2. deviceB offline
714      */
715     g_deviceB->Offline();
716 
717     /**
718      * @tc.steps: step3. deviceA call pull sync
719      * @tc.expected: step3. sync should return OK.
720      */
721     std::map<std::string, DBStatus> result;
722     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
723     ASSERT_TRUE(status == OK);
724 
725     /**
726      * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
727      *     deviceC has {k1, v1}, {k2, v2}, {k3 delete}, {k4,v4}
728      */
729     for (const auto &pair : result) {
730         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
731         if (pair.first == DEVICE_B) {
732             EXPECT_TRUE(pair.second == COMM_FAILURE);
733         } else {
734             EXPECT_TRUE(pair.second == OK);
735         }
736     }
737     VirtualDataItem item;
738     g_deviceC->GetData(key1, item);
739     EXPECT_TRUE(item.value == value1);
740     item.value.clear();
741     g_deviceC->GetData(key2, item);
742     EXPECT_TRUE(item.value == value2);
743     item.value.clear();
744     Key hashKey;
745     DistributedDBToolsUnitTest::CalcHash(key3, hashKey);
746     EXPECT_TRUE(g_deviceC->GetData(hashKey, item) == -E_NOT_FOUND);
747     item.value.clear();
748     g_deviceC->GetData(key4, item);
749     EXPECT_TRUE(item.value == value4);
750 }
751 
752 /**
753  * @tc.name: Device Offline Sync 002
754  * @tc.desc: Test pull sync when device offline
755  * @tc.type: FUNC
756  * @tc.require: AR000CCPOM
757  * @tc.author: xushaohua
758  */
759 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync002, TestSize.Level1)
760 {
761     std::vector<std::string> devices;
762     devices.push_back(g_deviceB->GetDeviceId());
763     devices.push_back(g_deviceC->GetDeviceId());
764 
765     /**
766      * @tc.steps: step1. deviceB put {k1, v1}
767      */
768     Key key1 = {'1'};
769     Value value1 = {'1'};
770     g_deviceB->PutData(key1, value1, 0, 0);
771 
772     /**
773      * @tc.steps: step2. deviceB offline
774      */
775     g_deviceB->Offline();
776 
777     /**
778      * @tc.steps: step3. deviceC put {k2, v2}, {k3, delete}, {k4, v4}
779      */
780     Key key2 = {'2'};
781     Value value2 = {'2'};
782     g_deviceC->PutData(key2, value2, 0, 0);
783 
784     Key key3 = {'3'};
785     Value value3 = {'3'};
786     g_deviceC->PutData(key3, value3, 0, 1);
787 
788     Key key4 = {'4'};
789     Value value4 = {'4'};
790     g_deviceC->PutData(key4, value4, 0, 0);
791 
792     /**
793      * @tc.steps: step2. deviceA call pull sync
794      * @tc.expected: step2. sync should return OK.
795      */
796     std::map<std::string, DBStatus> result;
797     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
798     ASSERT_TRUE(status == OK);
799 
800     /**
801      * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
802      *     deviceA has {k2, v2}, {k3 delete}, {k4,v4}
803      */
804     for (const auto &pair : result) {
805         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
806         if (pair.first == DEVICE_B) {
807             EXPECT_TRUE(pair.second == COMM_FAILURE);
808         } else {
809             EXPECT_TRUE(pair.second == OK);
810         }
811     }
812 
813     Value value5;
814     EXPECT_TRUE(g_kvDelegatePtr->Get(key1, value5) != OK);
815     g_kvDelegatePtr->Get(key2, value5);
816     EXPECT_EQ(value5, value2);
817     EXPECT_TRUE(g_kvDelegatePtr->Get(key3, value5) != OK);
818     g_kvDelegatePtr->Get(key4, value5);
819     EXPECT_EQ(value5, value4);
820 }
821 
822 /**
823   * @tc.name: EncryptedAlgoUpgrade001
824   * @tc.desc: Test upgrade encrypted db can sync normally
825   * @tc.type: FUNC
826   * @tc.require: AR000HI2JS
827   * @tc.author: zhuwentao
828   */
829 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, EncryptedAlgoUpgrade001, TestSize.Level3)
830 {
831     /**
832      * @tc.steps: step1. clear db
833      * * @tc.expected: step1. interface return ok
834     */
835     if (g_kvDelegatePtr != nullptr) {
836         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
837         g_kvDelegatePtr = nullptr;
838         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
839         LOGD("delete kv store status %d", status);
840         ASSERT_TRUE(status == OK);
841     }
842 
843     CipherPassword passwd;
844     std::vector<uint8_t> passwdVect = {'p', 's', 'd', '1'};
845     passwd.SetValue(passwdVect.data(), passwdVect.size());
846     /**
847      * @tc.steps: step2. open old db by sql
848      * * @tc.expected: step2. interface return ok
849     */
850     std::string identifier = DBCommon::GenerateIdentifierId(STORE_ID, APP_ID, USER_ID);
851     std::string hashDir = DBCommon::TransferHashString(identifier);
852     std::string hexHashDir = DBCommon::TransferStringToHex(hashDir);
853     std::string dbPath = g_testDir + "/" + hexHashDir + "/single_ver";
854     ASSERT_TRUE(DBCommon::CreateDirectory(g_testDir + "/" + hexHashDir) == OK);
855     ASSERT_TRUE(DBCommon::CreateDirectory(dbPath) == OK);
856     std::vector<std::string> dbDir {DBConstant::MAINDB_DIR, DBConstant::METADB_DIR, DBConstant::CACHEDB_DIR};
857     for (const auto &item : dbDir) {
858         ASSERT_TRUE(DBCommon::CreateDirectory(dbPath + "/" + item) == OK);
859     }
860     uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
861     sqlite3 *db;
862     std::string fileUrl = dbPath + "/" + DBConstant::MAINDB_DIR + "/" + DBConstant::SINGLE_VER_DATA_STORE + ".db";
863     ASSERT_TRUE(sqlite3_open_v2(fileUrl.c_str(), &db, flag, nullptr) == SQLITE_OK);
864     SQLiteUtils::SetKeyInner(db, CipherType::AES_256_GCM, passwd, DBConstant::DEFAULT_ITER_TIMES);
865     /**
866      * @tc.steps: step3. create table and close
867      * * @tc.expected: step3. interface return ok
868     */
869     ASSERT_TRUE(SQLiteUtils::ExecuteRawSQL(db, CREATE_SYNC_TABLE_SQL) == E_OK);
870     sqlite3_close_v2(db);
871     db = nullptr;
872     LOGI("create old db success");
873     /**
874      * @tc.steps: step4. get kvstore
875      * * @tc.expected: step4. interface return ok
876     */
877     KvStoreNbDelegate::Option option;
878     option.isEncryptedDb = true;
879     option.cipher = CipherType::AES_256_GCM;
880     option.passwd = passwd;
881     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
882     ASSERT_TRUE(g_kvDelegateStatus == OK);
883     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
884     /**
885      * @tc.steps: step5. sync ok
886      * * @tc.expected: step5. interface return ok
887     */
888     PullSyncTest();
889     /**
890      * @tc.steps: step5. crud ok
891      * * @tc.expected: step5. interface return ok
892     */
893     CrudTest();
894 }
895 
896 /**
897   * @tc.name: RemoveDeviceData002
898   * @tc.desc: test remove device data before sync
899   * @tc.type: FUNC
900   * @tc.require:
901   * @tc.author: zhuwentao
902   */
903 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData002, TestSize.Level1)
904 {
905     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
906     /**
907      * @tc.steps: step1. sync deviceB data to A and check data
908      * * @tc.expected: step1. interface return ok
909     */
910     Key key1 = {'1'};
911     Key key2 = {'2'};
912     Value value = {'1'};
913     Timestamp currentTime;
914     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
915     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
916     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
917     EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
918     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
919     Value actualValue;
920     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
921     EXPECT_EQ(actualValue, value);
922     actualValue.clear();
923     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
924     EXPECT_EQ(actualValue, value);
925     /**
926      * @tc.steps: step2. call RemoveDeviceData
927      * * @tc.expected: step2. interface return ok
928     */
929     g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
930     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
931     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
932     /**
933      * @tc.steps: step3. sync to device A again and check data
934      * * @tc.expected: step3. sync ok
935     */
936     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
937     actualValue.clear();
938     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
939     EXPECT_EQ(actualValue, value);
940     actualValue.clear();
941     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
942     EXPECT_EQ(actualValue, value);
943 }
944 
945 /**
946   * @tc.name: DataSync001
947   * @tc.desc: Test Data Sync when Initialize
948   * @tc.type: FUNC
949   * @tc.require: AR000HI2JS
950   * @tc.author: zhuwentao
951   */
952 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync001, TestSize.Level1)
953 {
954     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
955     ASSERT_TRUE(dataSync != nullptr);
956     std::shared_ptr<Metadata> inMetadata = nullptr;
957     std::string deviceId;
958     Message message;
959     VirtualSingleVerSyncDBInterface tmpInterface;
960     VirtualCommunicator tmpCommunicator(deviceId, g_communicatorAggregator);
961     EXPECT_EQ(dataSync->Initialize(nullptr, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
962     EXPECT_EQ(dataSync->Initialize(&tmpInterface, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
963     EXPECT_EQ(dataSync->Initialize(&tmpInterface, &tmpCommunicator, inMetadata, deviceId), -E_INVALID_ARGS);
964     delete dataSync;
965 }
966 
967 /**
968   * @tc.name: DataSync002
969   * @tc.desc: Test active sync with invalid param in DataSync Class
970   * @tc.type: FUNC
971   * @tc.require: AR000HI2JS
972   * @tc.author: zhuwentao
973   */
974 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync002, TestSize.Level1)
975 {
976     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
977     ASSERT_TRUE(dataSync != nullptr);
978     Message message;
979     EXPECT_EQ(dataSync->TryContinueSync(nullptr, &message), -E_INVALID_ARGS);
980     EXPECT_EQ(dataSync->TryContinueSync(nullptr, nullptr), -E_INVALID_ARGS);
981     EXPECT_EQ(dataSync->PushStart(nullptr), -E_INVALID_ARGS);
982     EXPECT_EQ(dataSync->PushPullStart(nullptr), -E_INVALID_ARGS);
983     EXPECT_EQ(dataSync->PullRequestStart(nullptr), -E_INVALID_ARGS);
984     EXPECT_EQ(dataSync->PullResponseStart(nullptr), -E_INVALID_ARGS);
985     delete dataSync;
986 }
987 
988 /**
989   * @tc.name: DataSync003
990   * @tc.desc: Test receive invalid request data packet in DataSync Class
991   * @tc.type: FUNC
992   * @tc.require: AR000HI2JS
993   * @tc.author: zhuwentao
994   */
995 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync003, TestSize.Level1)
996 {
997     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
998     ASSERT_TRUE(dataSync != nullptr);
999     uint64_t tmpMark = 0;
1000     Message message;
1001     EXPECT_EQ(dataSync->DataRequestRecv(nullptr, nullptr, tmpMark), -E_INVALID_ARGS);
1002     EXPECT_EQ(dataSync->DataRequestRecv(nullptr, &message, tmpMark), -E_INVALID_ARGS);
1003     delete dataSync;
1004 }
1005 
1006 /**
1007   * @tc.name: DataSync004
1008   * @tc.desc: Test receive invalid ack packet in DataSync Class
1009   * @tc.type: FUNC
1010   * @tc.require: AR000HI2JS
1011   * @tc.author: zhuwentao
1012   */
1013 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync004, TestSize.Level1)
1014 {
1015     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1016     ASSERT_TRUE(dataSync != nullptr);
1017     Message message;
1018     TestSingleVerKvSyncTaskContext tmpContext;
1019     EXPECT_EQ(dataSync->AckPacketIdCheck(nullptr), false);
1020     EXPECT_EQ(dataSync->AckPacketIdCheck(&message), false);
1021     EXPECT_EQ(dataSync->AckRecv(&tmpContext, nullptr), -E_INVALID_ARGS);
1022     EXPECT_EQ(dataSync->AckRecv(nullptr, nullptr), -E_INVALID_ARGS);
1023     EXPECT_EQ(dataSync->AckRecv(nullptr, &message), -E_INVALID_ARGS);
1024     delete dataSync;
1025 }
1026 
1027 /**
1028   * @tc.name: DataSync005
1029   * @tc.desc: Test receive invalid notify packet in DataSync Class
1030   * @tc.type: FUNC
1031   * @tc.require: AR000HI2JS
1032   * @tc.author: zhuwentao
1033   */
1034 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync005, TestSize.Level1)
1035 {
1036     ASSERT_NO_FATAL_FAILURE(DataSync005());
1037 }
1038 
1039 /**
1040   * @tc.name: DataSync006
1041   * @tc.desc: Test control start with invalid param in DataSync Class
1042   * @tc.type: FUNC
1043   * @tc.require: AR000HI2JS
1044   * @tc.author: zhuwentao
1045   */
1046 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync006, TestSize.Level1)
1047 {
1048     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1049     ASSERT_TRUE(dataSync != nullptr);
1050     TestSingleVerKvSyncTaskContext tmpContext;
1051     EXPECT_EQ(dataSync->ControlCmdStart(nullptr), -E_INVALID_ARGS);
1052     EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1053     std::shared_ptr<SubscribeManager> subManager = std::make_shared<SubscribeManager>();
1054     tmpContext.SetSubscribeManager(subManager);
1055     tmpContext.SetMode(SyncModeType::INVALID_MODE);
1056     EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1057     std::set<Key> Keys = {{'a'}, {'b'}};
1058     Query query = Query::Select().InKeys(Keys);
1059     QuerySyncObject innerQuery(query);
1060     tmpContext.SetQuery(innerQuery);
1061     tmpContext.SetMode(SyncModeType::SUBSCRIBE_QUERY);
1062     EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_NOT_SUPPORT);
1063     delete dataSync;
1064     subManager = nullptr;
1065 }
1066 
1067 /**
1068   * @tc.name: DataSync007
1069   * @tc.desc: Test receive invalid control packet in DataSync Class
1070   * @tc.type: FUNC
1071   * @tc.require: AR000HI2JS
1072   * @tc.author: zhuwentao
1073   */
1074 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync007, TestSize.Level1)
1075 {
1076     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1077     ASSERT_TRUE(dataSync != nullptr);
1078     Message message;
1079     ControlRequestPacket packet;
1080     TestSingleVerKvSyncTaskContext tmpContext;
1081     EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1082     message.SetCopiedObject(packet);
1083     EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1084     delete dataSync;
1085 }
1086 
1087 /**
1088   * @tc.name: DataSync008
1089   * @tc.desc: Test pull null msg in dataQueue in DataSync Class
1090   * @tc.type: FUNC
1091   * @tc.require: AR000HI2JS
1092   * @tc.author: zhuwentao
1093   */
1094 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync008, TestSize.Level1)
1095 {
1096     ASSERT_NO_FATAL_FAILURE(DataSync008());
1097 }
1098 
1099 /**
1100  * @tc.name: SyncRetry001
1101  * @tc.desc: use sync retry sync use push
1102  * @tc.type: FUNC
1103  * @tc.require: AR000CKRTD AR000CQE0E
1104  * @tc.author: zhuwentao
1105  */
1106 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry001, TestSize.Level3)
1107 {
1108     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1109     std::vector<std::string> devices;
1110     devices.push_back(g_deviceB->GetDeviceId());
1111 
1112     /**
1113      * @tc.steps: step1. set sync retry
1114      * @tc.expected: step1, Pragma return OK.
1115      */
1116     int pragmaData = 1;
1117     PragmaData input = static_cast<PragmaData>(&pragmaData);
1118     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1119 
1120     /**
1121      * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1122      */
1123     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1124 
1125     /**
1126      * @tc.steps: step3. deviceA call sync and wait
1127      * @tc.expected: step3. sync should return OK.
1128      */
1129     std::map<std::string, DBStatus> result;
1130     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1131 
1132     /**
1133      * @tc.expected: step4. onComplete should be called, and status is time_out
1134      */
1135     ASSERT_TRUE(result.size() == devices.size());
1136     for (const auto &pair : result) {
1137         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1138         EXPECT_TRUE(pair.second == OK);
1139     }
1140     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1141 }
1142 
1143 /**
1144  * @tc.name: SyncRetry002
1145  * @tc.desc: use sync retry sync use pull
1146  * @tc.type: FUNC
1147  * @tc.require: AR000CKRTD AR000CQE0E
1148  * @tc.author: zhuwentao
1149  */
1150 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry002, TestSize.Level3)
1151 {
1152     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE, 4u);
1153     std::vector<std::string> devices;
1154     devices.push_back(g_deviceB->GetDeviceId());
1155 
1156     /**
1157      * @tc.steps: step1. set sync retry
1158      * @tc.expected: step1, Pragma return OK.
1159      */
1160     int pragmaData = 1;
1161     PragmaData input = static_cast<PragmaData>(&pragmaData);
1162     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1163 
1164     /**
1165      * @tc.steps: step2. deviceA call sync and wait
1166      * @tc.expected: step2. sync should return OK.
1167      */
1168     std::map<std::string, DBStatus> result;
1169     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1170 
1171     /**
1172      * @tc.expected: step3. onComplete should be called, and status is time_out
1173      */
1174     ASSERT_TRUE(result.size() == devices.size());
1175     for (const auto &pair : result) {
1176         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1177         EXPECT_TRUE(pair.second == TIME_OUT);
1178     }
1179     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1180 }
1181 
1182 /**
1183  * @tc.name: SyncRetry003
1184  * @tc.desc: use sync retry sync use push by compress
1185  * @tc.type: FUNC
1186  * @tc.require: AR000CKRTD AR000CQE0E
1187  * @tc.author: zhuwentao
1188  */
1189 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry003, TestSize.Level3)
1190 {
1191     if (g_kvDelegatePtr != nullptr) {
1192         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1193         g_kvDelegatePtr = nullptr;
1194     }
1195     /**
1196      * @tc.steps: step1. open db use Compress
1197      * @tc.expected: step1, Pragma return OK.
1198      */
1199     KvStoreNbDelegate::Option option;
1200     option.isNeedCompressOnSync = true;
1201     option.compressionRate = 70;
1202     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1203     ASSERT_TRUE(g_kvDelegateStatus == OK);
1204     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1205 
1206     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1207     std::vector<std::string> devices;
1208     devices.push_back(g_deviceB->GetDeviceId());
1209 
1210     /**
1211      * @tc.steps: step2. set sync retry
1212      * @tc.expected: step2, Pragma return OK.
1213      */
1214     int pragmaData = 1;
1215     PragmaData input = static_cast<PragmaData>(&pragmaData);
1216     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1217 
1218     /**
1219      * @tc.steps: step3. deviceA put {k1, v1}, {k2, v2}
1220      */
1221     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1222 
1223     /**
1224      * @tc.steps: step4. deviceA call sync and wait
1225      * @tc.expected: step4. sync should return OK.
1226      */
1227     std::map<std::string, DBStatus> result;
1228     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1229 
1230     /**
1231      * @tc.expected: step5. onComplete should be called, and status is time_out
1232      */
1233     ASSERT_TRUE(result.size() == devices.size());
1234     for (const auto &pair : result) {
1235         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1236         EXPECT_TRUE(pair.second == OK);
1237     }
1238     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1239 }
1240 
1241 /**
1242  * @tc.name: SyncRetry004
1243  * @tc.desc: use query sync retry sync use push
1244  * @tc.type: FUNC
1245  * @tc.require: AR000CKRTD AR000CQE0E
1246  * @tc.author: zhuwentao
1247  */
1248 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry004, TestSize.Level3)
1249 {
1250     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1251     std::vector<std::string> devices;
1252     devices.push_back(g_deviceB->GetDeviceId());
1253 
1254     /**
1255      * @tc.steps: step1. set sync retry
1256      * @tc.expected: step1, Pragma return OK.
1257      */
1258     int pragmaData = 1;
1259     PragmaData input = static_cast<PragmaData>(&pragmaData);
1260     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1261 
1262     /**
1263      * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1264      */
1265     for (int i = 0; i < 5; i++) {
1266         Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 128); // rand num 1024 for test
1267         Value value;
1268         DistributedDBToolsUnitTest::GetRandomKeyValue(value, 256u);
1269         EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1270     }
1271 
1272     /**
1273      * @tc.steps: step3. deviceA call sync and wait
1274      * @tc.expected: step3. sync should return OK.
1275      */
1276     std::map<std::string, DBStatus> result;
1277     std::vector<uint8_t> prefixKey({'a', 'b'});
1278     Query query = Query::Select().PrefixKey(prefixKey);
1279     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
1280 
1281     /**
1282      * @tc.expected: step4. onComplete should be called, and status is time_out
1283      */
1284     ASSERT_TRUE(result.size() == devices.size());
1285     for (const auto &pair : result) {
1286         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1287         EXPECT_TRUE(pair.second == OK);
1288     }
1289     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1290 }
1291 
1292 /**
1293  * @tc.name: SyncRetry005
1294  * @tc.desc: use sync retry sync use pull by compress
1295  * @tc.type: FUNC
1296  * @tc.require: AR000CKRTD AR000CQE0E
1297  * @tc.author: zhangqiquan
1298  */
1299 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry005, TestSize.Level3)
1300 {
1301     if (g_kvDelegatePtr != nullptr) {
1302         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1303         g_kvDelegatePtr = nullptr;
1304     }
1305     /**
1306      * @tc.steps: step1. open db use Compress
1307      * @tc.expected: step1, Pragma return OK.
1308      */
1309     KvStoreNbDelegate::Option option;
1310     option.isNeedCompressOnSync = true;
1311     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1312     ASSERT_TRUE(g_kvDelegateStatus == OK);
1313     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1314 
1315     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1316     std::vector<std::string> devices;
1317     devices.push_back(g_deviceB->GetDeviceId());
1318 
1319     /**
1320      * @tc.steps: step2. set sync retry
1321      * @tc.expected: step2, Pragma return OK.
1322      */
1323     int pragmaData = 1;
1324     PragmaData input = static_cast<PragmaData>(&pragmaData);
1325     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1326 
1327     /**
1328      * @tc.steps: step3. deviceA call sync and wait
1329      * @tc.expected: step3. sync should return OK.
1330      */
1331     std::map<std::string, DBStatus> result;
1332     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1333 
1334     /**
1335      * @tc.expected: step4. onComplete should be called, and status is time_out
1336      */
1337     ASSERT_TRUE(result.size() == devices.size());
1338     for (const auto &pair : result) {
1339         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1340         EXPECT_EQ(pair.second, OK);
1341     }
1342     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1343 }
1344 
1345 /**
1346  * @tc.name: ReSetWatchDogTest001
1347  * @tc.desc: trigger resetWatchDog while pull
1348  * @tc.type: FUNC
1349  * @tc.require: AR000CKRTD AR000CQE0E
1350  * @tc.author: zhuwentao
1351  */
1352 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, ReSetWaterDogTest001, TestSize.Level3)
1353 {
1354     ASSERT_NO_FATAL_FAILURE(ReSetWaterDogTest001());
1355 }
1356 
1357 /**
1358   * @tc.name: RebuildSync001
1359   * @tc.desc: rebuild db and sync again
1360   * @tc.type: FUNC
1361   * @tc.require:
1362   * @tc.author: zhuwentao
1363   */
1364 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync001, TestSize.Level3)
1365 {
1366     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1367     /**
1368      * @tc.steps: step1. sync deviceB data to A and check data
1369      * * @tc.expected: step1. interface return ok
1370     */
1371     Key key1 = {'1'};
1372     Key key2 = {'2'};
1373     Value value = {'1'};
1374     Timestamp currentTime;
1375     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1376     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1377     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1378     EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1379     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1380 
1381     Value actualValue;
1382     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1383     EXPECT_EQ(actualValue, value);
1384     actualValue.clear();
1385     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1386     EXPECT_EQ(actualValue, value);
1387     /**
1388      * @tc.steps: step2. delete db and rebuild
1389      * * @tc.expected: step2. interface return ok
1390     */
1391     g_mgr.CloseKvStore(g_kvDelegatePtr);
1392     g_kvDelegatePtr = nullptr;
1393     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1394     KvStoreNbDelegate::Option option;
1395     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1396     ASSERT_TRUE(g_kvDelegateStatus == OK);
1397     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1398     /**
1399      * @tc.steps: step3. sync to device A again
1400      * * @tc.expected: step3. sync ok
1401     */
1402     value = {'2'};
1403     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1404     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1405     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1406     /**
1407      * @tc.steps: step4. check data in device A
1408      * * @tc.expected: step4. check ok
1409     */
1410     actualValue.clear();
1411     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1412     EXPECT_EQ(actualValue, value);
1413 }
1414 
1415 /**
1416   * @tc.name: RebuildSync002
1417   * @tc.desc: test clear remote data when receive data
1418   * @tc.type: FUNC
1419   * @tc.require:
1420   * @tc.author: zhuwentao
1421   */
1422 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync002, TestSize.Level1)
1423 {
1424     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1425     std::vector<std::string> devices;
1426     devices.push_back(g_deviceB->GetDeviceId());
1427     /**
1428      * @tc.steps: step1. device A SET_WIPE_POLICY
1429      * * @tc.expected: step1. interface return ok
1430     */
1431     int pragmaData = 2; // 2 means enable
1432     PragmaData input = static_cast<PragmaData>(&pragmaData);
1433     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_WIPE_POLICY, input) == OK);
1434     /**
1435      * @tc.steps: step2. sync deviceB data to A and check data
1436      * * @tc.expected: step2. interface return ok
1437     */
1438     Key key1 = {'1'};
1439     Key key2 = {'2'};
1440     Key key3 = {'3'};
1441     Key key4 = {'4'};
1442     Value value = {'1'};
1443     Timestamp currentTime;
1444     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1445     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1446     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1447     EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1448     EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1449     /**
1450      * @tc.steps: step3. deviceA call pull sync
1451      * @tc.expected: step3. sync should return OK.
1452      */
1453     std::map<std::string, DBStatus> result;
1454     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result) == OK);
1455 
1456     /**
1457      * @tc.expected: step4. onComplete should be called, check data
1458      */
1459     ASSERT_TRUE(result.size() == devices.size());
1460     for (const auto &pair : result) {
1461         EXPECT_TRUE(pair.second == OK);
1462     }
1463     Value actualValue;
1464     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1465     EXPECT_EQ(actualValue, value);
1466     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1467     EXPECT_EQ(actualValue, value);
1468     /**
1469      * @tc.steps: step5. device B rebuild and put some data
1470      * * @tc.expected: step5. rebuild ok
1471     */
1472     if (g_deviceB != nullptr) {
1473         delete g_deviceB;
1474         g_deviceB = nullptr;
1475     }
1476     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
1477     ASSERT_TRUE(g_deviceB != nullptr);
1478     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
1479     ASSERT_TRUE(syncInterfaceB != nullptr);
1480     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
1481     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1482     EXPECT_EQ(g_deviceB->PutData(key3, value, currentTime, 0), E_OK);
1483     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1484     EXPECT_EQ(g_deviceB->PutData(key4, value, currentTime, 0), E_OK);
1485     /**
1486      * @tc.steps: step6. sync to device A again and check data
1487      * * @tc.expected: step6. sync ok
1488     */
1489     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1490     EXPECT_EQ(g_kvDelegatePtr->Get(key3, actualValue), OK);
1491     EXPECT_EQ(actualValue, value);
1492     EXPECT_EQ(g_kvDelegatePtr->Get(key4, actualValue), OK);
1493     EXPECT_EQ(actualValue, value);
1494     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1495     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
1496 }
1497 
1498 /**
1499   * @tc.name: RebuildSync003
1500   * @tc.desc: test clear history data when receive ack
1501   * @tc.type: FUNC
1502   * @tc.require:
1503   * @tc.author: zhuwentao
1504   */
1505 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync003, TestSize.Level1)
1506 {
1507     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1508     /**
1509      * @tc.steps: step1. sync deviceB data to A and check data
1510      * * @tc.expected: step1. interface return ok
1511     */
1512     Key key1 = {'1'};
1513     Key key2 = {'2'};
1514     Key key3 = {'3'};
1515     Key key4 = {'4'};
1516     Value value = {'1'};
1517     EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1518     EXPECT_EQ(g_deviceB->PutData(key2, value, 2u, 0), E_OK); // 2: timestamp
1519     EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1520     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1521     Value actualValue;
1522     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1523     EXPECT_EQ(actualValue, value);
1524     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1525     EXPECT_EQ(actualValue, value);
1526     VirtualDataItem item;
1527     EXPECT_EQ(g_deviceB->GetData(key3, item), E_OK);
1528     EXPECT_EQ(item.value, value);
1529     /**
1530      * @tc.steps: step2. device B sync to device A,but make it failed
1531      * * @tc.expected: step2. interface return ok
1532     */
1533     EXPECT_EQ(g_deviceB->PutData(key4, value, 3u, 0), E_OK); // 3: timestamp
1534     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_A, DATA_SYNC_MESSAGE);
1535     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1536     /**
1537      * @tc.steps: step3. device B set delay send time
1538      * * @tc.expected: step3. interface return ok
1539     */
1540     std::set<std::string> delayDevice = {DEVICE_B};
1541     g_communicatorAggregator->SetSendDelayInfo(3000u, DATA_SYNC_MESSAGE, 1u, 0u, delayDevice); // delay 3000ms one time
1542     /**
1543      * @tc.steps: step4. device A rebuilt, device B push data to A and set clear remote data mark into context after 1s
1544      * * @tc.expected: step4. interface return ok
1545     */
1546     g_deviceB->SetClearRemoteStaleData(true);
1547     g_mgr.CloseKvStore(g_kvDelegatePtr);
1548     g_kvDelegatePtr = nullptr;
1549     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1550     KvStoreNbDelegate::Option option;
1551     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1552     ASSERT_TRUE(g_kvDelegateStatus == OK);
1553     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1554     std::map<std::string, DBStatus> result;
1555     std::vector<std::string> devices = {g_deviceB->GetDeviceId()};
1556     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1557     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1558     /**
1559      * @tc.steps: step5. device B sync to A, make it clear history data and check data
1560      * * @tc.expected: step5. interface return ok
1561     */
1562     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1563     EXPECT_EQ(g_deviceB->GetData(key3, item), -E_NOT_FOUND);
1564     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1565     EXPECT_EQ(actualValue, value);
1566     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1567     EXPECT_EQ(actualValue, value);
1568     g_communicatorAggregator->ResetSendDelayInfo();
1569 }
1570 
1571 /**
1572   * @tc.name: RemoveDeviceData001
1573   * @tc.desc: call rekey and removeDeviceData Concurrently
1574   * @tc.type: FUNC
1575   * @tc.require: AR000D487B
1576   * @tc.author: zhuwentao
1577   */
1578 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData001, TestSize.Level1)
1579 {
1580     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1581     /**
1582      * @tc.steps: step1. sync deviceB data to A
1583      * * @tc.expected: step1. interface return ok
1584     */
1585     Key key1 = {'1'};
1586     Key key2 = {'2'};
1587     Value value = {'1'};
1588     g_deviceB->PutData(key1, value, 1, 0);
1589     g_deviceB->PutData(key2, value, 2, 0);
1590     g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true);
1591 
1592     Value actualValue;
1593     g_kvDelegatePtr->Get(key1, actualValue);
1594     EXPECT_EQ(actualValue, value);
1595     actualValue.clear();
1596     g_kvDelegatePtr->Get(key2, actualValue);
1597     EXPECT_EQ(actualValue, value);
1598     /**
1599      * @tc.steps: step2. call Rekey and RemoveDeviceData Concurrently
1600      * * @tc.expected: step2. interface return ok
1601     */
__anon61e499a11302() 1602     std::thread thread1([]() {
1603         CipherPassword passwd3;
1604         std::vector<uint8_t> passwdVect = {'p', 's', 'd', 'z'};
1605         passwd3.SetValue(passwdVect.data(), passwdVect.size());
1606         g_kvDelegatePtr->Rekey(passwd3);
1607     });
__anon61e499a11402() 1608     std::thread thread2([]() {
1609         g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
1610     });
1611     thread1.join();
1612     thread2.join();
1613 }
1614 
1615 /**
1616   * @tc.name: DeviceOfflineSyncTask001
1617   * @tc.desc: Test sync task when device offline and close db Concurrently
1618   * @tc.type: FUNC
1619   * @tc.require: AR000HI2JS
1620   * @tc.author: zhuwentao
1621   */
1622 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask001, TestSize.Level3)
1623 {
1624     DBStatus status = OK;
1625     std::vector<std::string> devices;
1626     devices.push_back(g_deviceB->GetDeviceId());
1627 
1628     /**
1629      * @tc.steps: step1. deviceA put {k1, v1}
1630      */
1631     Key key = {'1'};
1632     Value value = {'1'};
1633     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1634 
1635     /**
1636      * @tc.steps: step2. deviceA set auto sync and put some key/value
1637      * @tc.expected: step2. interface should return OK.
1638      */
1639     bool autoSync = true;
1640     PragmaData data = static_cast<PragmaData>(&autoSync);
1641     status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1642     ASSERT_EQ(status, OK);
1643 
1644     Key key1 = {'2'};
1645     Key key2 = {'3'};
1646     Key key3 = {'4'};
1647     Key key4 = {'5'};
1648     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1649     ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1650     ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1651     ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1652     ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value) == OK);
1653     /**
1654      * @tc.steps: step3. device offline and close db Concurrently
1655      * @tc.expected: step3. interface should return OK.
1656      */
__anon61e499a11502() 1657     std::thread thread1([]() {
1658         g_mgr.CloseKvStore(g_kvDelegatePtr);
1659         g_kvDelegatePtr = nullptr;
1660     });
__anon61e499a11602() 1661     std::thread thread2([]() {
1662         g_deviceB->Offline();
1663     });
1664     thread1.join();
1665     thread2.join();
1666     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1667     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1668 }
1669 
1670 /**
1671   * @tc.name: DeviceOfflineSyncTask002
1672   * @tc.desc: Test sync task when autoSync and close db Concurrently
1673   * @tc.type: FUNC
1674   * @tc.require:
1675   * @tc.author: zhuwentao
1676   */
1677 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask002, TestSize.Level3)
1678 {
1679     DBStatus status = OK;
1680     g_deviceC->Offline();
1681 
1682     /**
1683      * @tc.steps: step1. deviceA put {k1, v1}
1684      */
1685     Key key = {'1'};
1686     Value value = {'1'};
1687     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1688 
1689     /**
1690      * @tc.steps: step2. deviceA set auto sync and put some key/value
1691      * @tc.expected: step2. interface should return OK.
1692      */
1693     bool autoSync = true;
1694     PragmaData data = static_cast<PragmaData>(&autoSync);
1695     status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1696     ASSERT_EQ(status, OK);
1697     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME * 2));
1698 
1699     Key key1 = {'2'};
1700     Key key2 = {'3'};
1701     Key key3 = {'4'};
1702     ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1703     ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1704     ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1705     /**
1706      * @tc.steps: step3. close db
1707      * @tc.expected: step3. interface should return OK.
1708      */
1709     g_mgr.CloseKvStore(g_kvDelegatePtr);
1710     g_kvDelegatePtr = nullptr;
1711     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1712 }
1713 
1714 /**
1715   * @tc.name: DeviceOfflineSyncTask003
1716   * @tc.desc: Test sync task when device offline after call sync
1717   * @tc.type: FUNC
1718   * @tc.require:
1719   * @tc.author: zhuwentao
1720   */
1721 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask003, TestSize.Level3)
1722 {
1723     std::vector<std::string> devices;
1724     devices.push_back(g_deviceB->GetDeviceId());
1725 
1726     /**
1727      * @tc.steps: step1. deviceA put {k1, v1}
1728      */
1729     Key key = {'1'};
1730     Value value = {'1'};
1731     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1732     /**
1733      * @tc.steps: step2. device offline after call sync
1734      * @tc.expected: step2. interface should return OK.
1735      */
1736     Query query = Query::Select().PrefixKey(key);
1737     ASSERT_TRUE(g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, query, false) == OK);
1738     std::this_thread::sleep_for(std::chrono::milliseconds(15)); // wait for 15ms
1739     g_deviceB->Offline();
1740 }
1741 
1742 /**
1743   * @tc.name: GetSyncDataFail001
1744   * @tc.desc: test get sync data failed when sync
1745   * @tc.type: FUNC
1746   * @tc.require:
1747   * @tc.author: zhuwentao
1748   */
1749 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail001, TestSize.Level1)
1750 {
1751     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1752     /**
1753      * @tc.steps: step1. device B set get data errCode control and put some data
1754      * * @tc.expected: step1. interface return ok
1755     */
1756     g_deviceB->SetGetDataErrCode(1, -E_BUSY, true);
1757     Key key1 = {'1'};
1758     Value value = {'1'};
1759     EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1760     /**
1761      * @tc.steps: step2. device B sync to device A and check data
1762      * * @tc.expected: step2. interface return ok
1763     */
1764     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1765     Value actualValue;
1766     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1767     g_deviceB->ResetDataControl();
1768 }
1769 
1770 /**
1771   * @tc.name: GetSyncDataFail002
1772   * @tc.desc: test get sync data failed when sync with large data
1773   * @tc.type: FUNC
1774   * @tc.require: AR000D487B
1775   * @tc.author: zhuwentao
1776   */
1777 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail002, TestSize.Level1)
1778 {
1779     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1780     /**
1781      * @tc.steps: step1. device B set get data errCode control and put some data
1782      * * @tc.expected: step1. interface return ok
1783     */
1784     g_deviceB->SetGetDataErrCode(2, -E_BUSY, true);
1785     int totalSize = 4000u;
1786     std::vector<Entry> entries;
1787     std::vector<Key> keys;
1788     const int keyLen = 10; // 20 Bytes
1789     const int valueLen = 10; // 20 Bytes
1790     DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1791     uint32_t i = 1u;
1792     for (const auto &entry : entries) {
1793         EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1794         i++;
1795     }
1796     /**
1797      * @tc.steps: step2. device B sync to device A and check data
1798      * * @tc.expected: step2. interface return ok
1799     */
1800     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1801     std::this_thread::sleep_for(std::chrono::seconds(1));
1802     Value actualValue;
1803     for (int j = 1u; j <= totalSize; j++) {
1804         if (j > totalSize / 2) {
1805             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1806         } else {
1807             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1808         }
1809     }
1810     g_deviceB->ResetDataControl();
1811 }
1812 
1813 /**
1814   * @tc.name: GetSyncDataFail003
1815   * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1816   * @tc.type: FUNC
1817   * @tc.require:
1818   * @tc.author: zhuwentao
1819   */
1820 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail003, TestSize.Level1)
1821 {
1822     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1823     /**
1824      * @tc.steps: step1. device B set get data errCode control and put some data
1825      * * @tc.expected: step1. interface return ok
1826     */
1827     g_deviceB->SetGetDataErrCode(1, -E_EKEYREVOKED, true);
1828     Key key1 = {'1'};
1829     Key key2 = {'3'};
1830     Value value = {'1'};
1831     EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1832     EXPECT_EQ(g_kvDelegatePtr->Put(key2, value), OK);
1833     /**
1834      * @tc.steps: step2. device B sync to device A and check data
1835      * * @tc.expected: step2. interface return ok
1836     */
1837     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1838     Value actualValue;
1839     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1840     VirtualDataItem item;
1841     EXPECT_EQ(g_deviceB->GetData(key2, item), E_OK);
1842     g_deviceB->ResetDataControl();
1843 }
1844 
1845 /**
1846   * @tc.name: GetSyncDataFail004
1847   * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1848   * @tc.type: FUNC
1849   * @tc.require:
1850   * @tc.author: zhuwentao
1851   */
1852 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail004, TestSize.Level1)
1853 {
1854     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1855     /**
1856      * @tc.steps: step1. device B set get data errCode control and put some data
1857      * * @tc.expected: step1. interface return ok
1858     */
1859     g_deviceB->SetGetDataErrCode(2, -E_EKEYREVOKED, true);
1860     int totalSize = 4000u;
1861     std::vector<Entry> entries;
1862     std::vector<Key> keys;
1863     const int keyLen = 10; // 20 Bytes
1864     const int valueLen = 10; // 20 Bytes
1865     DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1866     uint32_t i = 1u;
1867     for (const auto &entry : entries) {
1868         EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1869         i++;
1870     }
1871     Key key = {'a', 'b', 'c'};
1872     Value value = {'1'};
1873     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1874     /**
1875      * @tc.steps: step2. device B sync to device A and check data
1876      * * @tc.expected: step2. interface return ok
1877     */
1878     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1879     std::this_thread::sleep_for(std::chrono::seconds(1));
1880     Value actualValue;
1881     for (int j = 1u; j <= totalSize; j++) {
1882         if (j > totalSize / 2) {
1883             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1884         } else {
1885             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1886         }
1887     }
1888     VirtualDataItem item;
1889     EXPECT_EQ(g_deviceB->GetData(key, item), E_OK);
1890     g_deviceB->ResetDataControl();
1891 }
1892 
1893 /**
1894   * @tc.name: InterceptDataFail001
1895   * @tc.desc: test intercept data failed when sync
1896   * @tc.type: FUNC
1897   * @tc.require:
1898   * @tc.author: zhuwentao
1899   */
1900 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptDataFail001, TestSize.Level1)
1901 {
1902     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1903     /**
1904      * @tc.steps: step1. device A set intercept data errCode and put some data
1905      * * @tc.expected: step1. interface return ok
1906     */
1907     g_kvDelegatePtr->SetPushDataInterceptor(
__anon61e499a11702(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 1908         [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
1909             int errCode = OK;
1910             auto entries = data.GetEntries();
1911             LOGD("====here111,size=%d", entries.size());
1912             for (size_t i = 0; i < entries.size(); i++) {
1913                 Key newKey;
1914                 errCode = data.ModifyKey(i, newKey);
1915                 if (errCode != OK) {
1916                     break;
1917                 }
1918             }
1919             return errCode;
1920         }
1921     );
1922     Key key = {'1'};
1923     Value value = {'1'};
1924     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1925     /**
1926      * @tc.steps: step2. device A sync to device B and check data
1927      * * @tc.expected: step2. interface return ok
1928     */
1929     std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
1930     std::map<std::string, DBStatus> result;
1931     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1932     ASSERT_TRUE(result.size() == devices.size());
1933     for (const auto &pair : result) {
1934         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1935         EXPECT_TRUE(pair.second == INTERCEPT_DATA_FAIL);
1936     }
1937     VirtualDataItem item;
1938     EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
1939 }
1940 
1941 /**
1942   * @tc.name: InterceptDataFail002
1943   * @tc.desc: test intercept data failed when sync
1944   * @tc.type: FUNC
1945   * @tc.require:
1946   * @tc.author: zhangqiquan
1947   */
1948 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptDataFail002, TestSize.Level0)
1949 {
1950     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1951     /**
1952      * @tc.steps: step1. device A set intercept data errCode and B put some data
1953      * @tc.expected: step1. interface return ok
1954      */
1955     g_kvDelegatePtr->SetReceiveDataInterceptor(
__anon61e499a11802(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 1956         [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
1957             auto entries = data.GetEntries();
1958             LOGD("====on receive,size=%d", entries.size());
1959             for (size_t i = 0; i < entries.size(); i++) {
1960                 Key newKey;
1961                 int errCode = data.ModifyKey(i, newKey);
1962                 if (errCode != OK) {
1963                     return errCode;
1964                 }
1965             }
1966             return E_OK;
1967         }
1968     );
1969     Key key = {'1'};
1970     Value value = {'1'};
1971     g_deviceB->PutData(key, value, 1u, 0); // 1 is timestamp
1972     /**
1973      * @tc.steps: step2. device A sync to device B and check data
1974      * @tc.expected: step2. interface return ok
1975      */
1976     std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
1977     std::map<std::string, DBStatus> result;
1978     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1979     ASSERT_TRUE(result.size() == devices.size());
1980     for (const auto &pair : result) {
1981         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1982         EXPECT_EQ(pair.second, INTERCEPT_DATA_FAIL);
1983     }
1984     Value actualValue;
1985     EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), NOT_FOUND);
1986 }
1987 
1988 /**
1989   * @tc.name: InterceptData001
1990   * @tc.desc: test intercept receive data when sync
1991   * @tc.type: FUNC
1992   * @tc.require:
1993   * @tc.author: zhangqiquan
1994   */
1995 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptData001, TestSize.Level0)
1996 {
1997     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1998     /**
1999      * @tc.steps: step1. device A set intercept data errCode and B put some data
2000      * @tc.expected: step1. interface return ok
2001      */
2002     g_kvDelegatePtr->SetReceiveDataInterceptor(
__anon61e499a11902(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 2003         [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
2004             auto entries = data.GetEntries();
2005             LOGD("====on receive,size=%d", entries.size());
2006             for (size_t i = 0; i < entries.size(); i++) {
2007                 Key newKey = {'2'};
2008                 int errCode = data.ModifyKey(i, newKey);
2009                 if (errCode != OK) {
2010                     return errCode;
2011                 }
2012                 Value newValue = {'3'};
2013                 errCode = data.ModifyValue(i, newValue);
2014                 if (errCode != OK) {
2015                     return errCode;
2016                 }
2017             }
2018             return E_OK;
2019         }
2020     );
2021     Key key = {'1'};
2022     Value value = {'1'};
2023     g_deviceB->PutData(key, value, 1u, 0); // 1 is timestamp
2024     /**
2025      * @tc.steps: step2. device A sync to device B and check data
2026      * @tc.expected: step2. interface return ok
2027      */
2028     std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2029     std::map<std::string, DBStatus> result;
2030     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
2031     ASSERT_TRUE(result.size() == devices.size());
2032     for (const auto &pair : result) {
2033         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2034         EXPECT_EQ(pair.second, OK);
2035     }
2036     Value actualValue;
2037     EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), NOT_FOUND);
2038     key = {'2'};
2039     EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), OK);
2040     value = {'3'};
2041     EXPECT_EQ(actualValue, value);
2042 }
2043 
2044 /**
2045   * @tc.name: UpdateKey001
2046   * @tc.desc: test update key can effect local data and sync data, without delete data
2047   * @tc.type: FUNC
2048   * @tc.require:
2049   * @tc.author: zhangqiquan
2050   */
2051 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, UpdateKey001, TestSize.Level1)
2052 {
2053     /**
2054      * @tc.steps: step1. device A set sync data (k1, v1) local data (k2, v2) (k3, v3) and delete (k4, v4)
2055      * @tc.expected: step1. put data return ok
2056      */
2057     Key k1 = {'k', '1'};
2058     Value v1 = {'v', '1'};
2059     g_deviceB->PutData(k1, v1, 1, 0);
2060     ASSERT_EQ(g_deviceB->Sync(SyncMode::SYNC_MODE_PUSH_ONLY, true), E_OK);
2061     Value actualValue;
2062     EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
2063     EXPECT_EQ(v1, actualValue);
2064     Key k2 = {'k', '2'};
2065     Value v2 = {'v', '2'};
2066     Key k3 = {'k', '3'};
2067     Value v3 = {'v', '3'};
2068     Key k4 = {'k', '4'};
2069     Value v4 = {'v', '4'};
2070     EXPECT_EQ(g_kvDelegatePtr->Put(k2, v2), OK);
2071     EXPECT_EQ(g_kvDelegatePtr->Put(k3, v3), OK);
2072     EXPECT_EQ(g_kvDelegatePtr->Put(k4, v4), OK);
2073     EXPECT_EQ(g_kvDelegatePtr->Delete(k4), OK);
2074     /**
2075      * @tc.steps: step2. device A update key and set
2076      * @tc.expected: step2. put data return ok
2077      */
__anon61e499a11a02(const Key &originKey, Key &newKey) 2078     DBStatus status = g_kvDelegatePtr->UpdateKey([](const Key &originKey, Key &newKey) {
2079         newKey = originKey;
2080         newKey.push_back('0');
2081     });
2082     EXPECT_EQ(status, OK);
2083     k1.push_back('0');
2084     k2.push_back('0');
2085     k3.push_back('0');
2086     EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
2087     EXPECT_EQ(v1, actualValue);
2088     EXPECT_EQ(g_kvDelegatePtr->Get(k2, actualValue), OK);
2089     EXPECT_EQ(v2, actualValue);
2090     EXPECT_EQ(g_kvDelegatePtr->Get(k3, actualValue), OK);
2091     EXPECT_EQ(v3, actualValue);
2092 }
2093 
2094 /**
2095   * @tc.name: MetaBusy001
2096   * @tc.desc: test sync normal when update water mark busy
2097   * @tc.type: FUNC
2098   * @tc.require:
2099   * @tc.author: zhangqiquan
2100   */
2101 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, MetaBusy001, TestSize.Level1)
2102 {
2103     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2104     Key key = {'1'};
2105     Value value = {'1'};
2106     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
2107     std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2108     std::map<std::string, DBStatus> result;
2109     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2110     ASSERT_EQ(result.size(), devices.size());
2111     for (const auto &pair : result) {
2112         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2113         EXPECT_TRUE(pair.second == OK);
2114     }
2115     value = {'2'};
2116     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
__anon61e499a11b02() 2117     g_deviceB->SetSaveDataCallback([] () {
2118         RuntimeContext::GetInstance()->ScheduleTask([]() {
2119             g_deviceB->EraseWaterMark("real_device");
2120         });
2121         std::this_thread::sleep_for(std::chrono::seconds(1));
2122     });
2123     EXPECT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2124     EXPECT_EQ(result.size(), devices.size());
2125     for (const auto &pair : result) {
2126         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2127         EXPECT_TRUE(pair.second == OK);
2128     }
2129     g_deviceB->SetSaveDataCallback(nullptr);
2130     RuntimeContext::GetInstance()->StopTaskPool();
2131 }