• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <condition_variable>
17 #include <gtest/gtest.h>
18 #include <thread>
19 
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "mock_sync_task_context.h"
27 #include "platform_specific.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_kv_sync_task_context.h"
30 
31 using namespace testing::ext;
32 using namespace DistributedDB;
33 using namespace DistributedDBUnitTest;
34 using namespace std;
35 
36 namespace {
37     class TestSingleVerKvSyncTaskContext : public SingleVerKvSyncTaskContext {
38     public:
39         TestSingleVerKvSyncTaskContext() = default;
40     };
41     string g_testDir;
42     const string STORE_ID = "kv_stroe_complex_sync_test";
43     const int WAIT_TIME = 1000;
44     const std::string DEVICE_A = "real_device";
45     const std::string DEVICE_B = "deviceB";
46     const std::string DEVICE_C = "deviceC";
47     const std::string CREATE_SYNC_TABLE_SQL =
48     "CREATE TABLE IF NOT EXISTS sync_data(" \
49         "key         BLOB NOT NULL," \
50         "value       BLOB," \
51         "timestamp   INT  NOT NULL," \
52         "flag        INT  NOT NULL," \
53         "device      BLOB," \
54         "ori_device  BLOB," \
55         "hash_key    BLOB PRIMARY KEY NOT NULL," \
56         "w_timestamp INT," \
57         "modify_time INT," \
58         "create_time INT" \
59         ");";
60 
61     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
62     KvStoreConfig g_config;
63     DistributedDBToolsUnitTest g_tool;
64     DBStatus g_kvDelegateStatus = INVALID_ARGS;
65     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
66     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
67     KvVirtualDevice *g_deviceB = nullptr;
68     KvVirtualDevice *g_deviceC = nullptr;
69 
70     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
71     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
72         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
73 
PullSyncTest()74     void PullSyncTest()
75     {
76         DBStatus status = OK;
77         std::vector<std::string> devices;
78         devices.push_back(g_deviceB->GetDeviceId());
79 
80         Key key = {'1'};
81         Key key2 = {'2'};
82         Value value = {'1'};
83         g_deviceB->PutData(key, value, 0, 0);
84         g_deviceB->PutData(key2, value, 1, 0);
85 
86         std::map<std::string, DBStatus> result;
87         status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
88         ASSERT_TRUE(status == OK);
89 
90         ASSERT_TRUE(result.size() == devices.size());
91         for (const auto &pair : result) {
92             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
93             EXPECT_TRUE(pair.second == OK);
94         }
95         Value value3;
96         EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
97         EXPECT_EQ(value3, value);
98         EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
99         EXPECT_EQ(value3, value);
100     }
101 
CrudTest()102     void CrudTest()
103     {
104         vector<Entry> entries;
105         int totalSize = 10;
106         for (int i = 0; i < totalSize; i++) {
107             Entry entry;
108             entry.key.push_back(i);
109             entry.value.push_back('2');
110             entries.push_back(entry);
111         }
112         EXPECT_TRUE(g_kvDelegatePtr->PutBatch(entries) == OK);
113         for (const auto &entry : entries) {
114             Value resultvalue;
115             EXPECT_TRUE(g_kvDelegatePtr->Get(entry.key, resultvalue) == OK);
116             EXPECT_TRUE(resultvalue == entry.value);
117         }
118         for (int i = 0; i < totalSize / 2; i++) { // 2: Half of the total
119             g_kvDelegatePtr->Delete(entries[i].key);
120             Value resultvalue;
121             EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == NOT_FOUND);
122         }
123         for (int i = totalSize / 2; i < totalSize; i++) {
124             Value value = entries[i].value;
125             value.push_back('x');
126             EXPECT_TRUE(g_kvDelegatePtr->Put(entries[i].key, value) == OK);
127             Value resultvalue;
128             EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == OK);
129             EXPECT_TRUE(resultvalue == value);
130         }
131     }
132 
DataSync005()133     void DataSync005()
134     {
135         ASSERT_NE(g_communicatorAggregator, nullptr);
136         SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
137         ASSERT_TRUE(dataSync != nullptr);
138         dataSync->SendSaveDataNotifyPacket(nullptr, 0, 0, 0, TIME_SYNC_MESSAGE);
139         EXPECT_EQ(g_communicatorAggregator->GetOnlineDevices().size(), 3u); // 3 online dev
140         delete dataSync;
141     }
142 
DataSync008()143     void DataSync008()
144     {
145         SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
146         ASSERT_TRUE(dataSync != nullptr);
147         auto context = new (std::nothrow) MockSyncTaskContext();
148         dataSync->PutDataMsg(nullptr);
149         bool isNeedHandle = false;
150         bool isContinue = false;
151         EXPECT_EQ(dataSync->MoveNextDataMsg(context, isNeedHandle, isContinue), nullptr);
152         EXPECT_EQ(isNeedHandle, false);
153         EXPECT_EQ(isContinue, false);
154         delete dataSync;
155         delete context;
156     }
157 
ReSetWaterDogTest001()158     void ReSetWaterDogTest001()
159     {
160         /**
161          * @tc.steps: step1. put 10 key/value
162          * @tc.expected: step1, put return OK.
163          */
164         for (int i = 0; i < 5; i++) { // put 5 key
165             Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 1024); // rand num 1024 for test
166             Value value;
167             DistributedDBToolsUnitTest::GetRandomKeyValue(value, 10 * 50 * 1024u); // 10 * 50 * 1024 = 500k
168             EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
169         }
170         /**
171          * @tc.steps: step2. SetDeviceMtuSize
172          * @tc.expected: step2, return OK.
173          */
174         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 50 * 1024u); // 50 * 1024u = 50k
175         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 50 * 1024u); // 50 * 1024u = 50k
176         /**
177          * @tc.steps: step3. deviceA,deviceB sync to each other at same time
178          * @tc.expected: step3. sync should return OK.
179          */
180         EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true), E_OK);
181         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
182         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
183     }
184 }
185 
186 class DistributedDBSingleVerP2PComplexSyncTest : public testing::Test {
187 public:
188     static void SetUpTestCase(void);
189     static void TearDownTestCase(void);
190     void SetUp();
191     void TearDown();
192 };
193 
SetUpTestCase(void)194 void DistributedDBSingleVerP2PComplexSyncTest::SetUpTestCase(void)
195 {
196     /**
197      * @tc.setup: Init datadir and Virtual Communicator.
198      */
199     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
200     g_config.dataDir = g_testDir;
201     g_mgr.SetKvStoreConfig(g_config);
202 
203     string dir = g_testDir + "/single_ver";
204     DIR* dirTmp = opendir(dir.c_str());
205     if (dirTmp == nullptr) {
206         OS::MakeDBDirectory(dir);
207     } else {
208         closedir(dirTmp);
209     }
210 
211     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
212     ASSERT_TRUE(g_communicatorAggregator != nullptr);
213     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
214 }
215 
TearDownTestCase(void)216 void DistributedDBSingleVerP2PComplexSyncTest::TearDownTestCase(void)
217 {
218     /**
219      * @tc.teardown: Release virtual Communicator and clear data dir.
220      */
221     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
222         LOGE("rm test db files error!");
223     }
224     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
225 }
226 
SetUp(void)227 void DistributedDBSingleVerP2PComplexSyncTest::SetUp(void)
228 {
229     DistributedDBToolsUnitTest::PrintTestCaseInfo();
230     /**
231      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
232      */
233     KvStoreNbDelegate::Option option;
234     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
235     ASSERT_TRUE(g_kvDelegateStatus == OK);
236     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
237     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
238     ASSERT_TRUE(g_deviceB != nullptr);
239     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
240     ASSERT_TRUE(syncInterfaceB != nullptr);
241     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
242 
243     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
244     ASSERT_TRUE(g_deviceC != nullptr);
245     VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
246     ASSERT_TRUE(syncInterfaceC != nullptr);
247     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
248 
249     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
250         const std::string &deviceId, uint8_t flag) -> bool {
251             return true;
252         };
253     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
254 }
255 
TearDown(void)256 void DistributedDBSingleVerP2PComplexSyncTest::TearDown(void)
257 {
258     /**
259      * @tc.teardown: Release device A, B, C
260      */
261     if (g_kvDelegatePtr != nullptr) {
262         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
263         g_kvDelegatePtr = nullptr;
264         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
265         LOGD("delete kv store status %d", status);
266         ASSERT_TRUE(status == OK);
267     }
268     if (g_deviceB != nullptr) {
269         delete g_deviceB;
270         g_deviceB = nullptr;
271     }
272     if (g_deviceC != nullptr) {
273         delete g_deviceC;
274         g_deviceC = nullptr;
275     }
276     PermissionCheckCallbackV2 nullCallback;
277     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
278 }
279 
280 /**
281   * @tc.name: SaveDataNotify001
282   * @tc.desc: Test SaveDataNotify function, delay < 30s should sync ok, > 36 should timeout
283   * @tc.type: FUNC
284   * @tc.require: AR000D4876
285   * @tc.author: xushaohua
286   */
287 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SaveDataNotify001, TestSize.Level3)
288 {
289     DBStatus status = OK;
290     const int waitFiveSeconds = 5000;
291     const int waitThirtySeconds = 30000;
292     const int waitThirtySixSeconds = 36000;
293     std::vector<std::string> devices;
294     devices.push_back(g_deviceB->GetDeviceId());
295 
296     /**
297      * @tc.steps: step1. deviceA put {k1, v1}
298      */
299     Key key = {'1'};
300     Value value = {'1'};
301     status = g_kvDelegatePtr->Put(key, value);
302     ASSERT_TRUE(status == OK);
303 
304     /**
305      * @tc.steps: step2. deviceB set sava data dely 5s
306      */
307     g_deviceB->SetSaveDataDelayTime(waitFiveSeconds);
308 
309     /**
310      * @tc.steps: step3. deviceA call sync and wait
311      * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
312      */
313     std::map<std::string, DBStatus> result;
314     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
315     ASSERT_TRUE(status == OK);
316     ASSERT_TRUE(result.size() == devices.size());
317     ASSERT_TRUE(result[DEVICE_B] == OK);
318 
319     /**
320      * @tc.steps: step4. deviceB set sava data dely 30s and put {k1, v1}
321      */
322     g_deviceB->SetSaveDataDelayTime(waitThirtySeconds);
323     status = g_kvDelegatePtr->Put(key, value);
324     ASSERT_TRUE(status == OK);
325      /**
326      * @tc.steps: step3. deviceA call sync and wait
327      * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
328      */
329     result.clear();
330     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
331     ASSERT_TRUE(status == OK);
332     ASSERT_TRUE(result.size() == devices.size());
333     ASSERT_TRUE(result[DEVICE_B] == OK);
334 
335     /**
336      * @tc.steps: step4. deviceB set sava data dely 36s and put {k1, v1}
337      */
338     g_deviceB->SetSaveDataDelayTime(waitThirtySixSeconds);
339     status = g_kvDelegatePtr->Put(key, value);
340     ASSERT_TRUE(status == OK);
341     /**
342      * @tc.steps: step5. deviceA call sync and wait
343      * @tc.expected: step5. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
344      */
345     result.clear();
346     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
347     ASSERT_TRUE(status == OK);
348     ASSERT_TRUE(result.size() == devices.size());
349     ASSERT_TRUE(result[DEVICE_B] == TIME_OUT);
350 }
351 
352 /**
353   * @tc.name: SametimeSync001
354   * @tc.desc: Test 2 device sync with each other
355   * @tc.type: FUNC
356   * @tc.require: AR000CCPOM
357   * @tc.author: zhangqiquan
358   */
359 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync001, TestSize.Level3)
360 {
361     DBStatus status = OK;
362     std::vector<std::string> devices;
363     devices.push_back(g_deviceB->GetDeviceId());
364 
365     int responseCount = 0;
366     int requestCount = 0;
367     Key key = {'1'};
368     Value value = {'1'};
369     /**
370      * @tc.steps: step1. make sure deviceB send pull firstly and response_pull secondly
371      * @tc.expected: step1. deviceA put data when finish push task. put data should return OK.
372      */
373     g_communicatorAggregator->RegOnDispatch([&responseCount, &requestCount, &key, &value](
__anon52917de10302( const std::string &target, DistributedDB::Message *msg) 374         const std::string &target, DistributedDB::Message *msg) {
375         if (target != "real_device" || msg->GetMessageId() != DATA_SYNC_MESSAGE) {
376             return;
377         }
378 
379         if (msg->GetMessageType() == TYPE_RESPONSE) {
380             responseCount++;
381             if (responseCount == 1) { // 1 is the ack which B response A's push task
382                 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), DBStatus::OK);
383                 std::this_thread::sleep_for(std::chrono::seconds(1));
384             } else if (responseCount == 2) { // 2 is the ack which B response A's response_pull task
385                 msg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
386             }
387         } else if (msg->GetMessageType() == TYPE_REQUEST) {
388             requestCount++;
389             if (requestCount == 1) { // 1 is A push task
390                 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2 sec
391             }
392         }
393     });
394     /**
395      * @tc.steps: step2. deviceA,deviceB sync to each other at same time
396      * @tc.expected: step2. sync should return OK.
397      */
398     std::map<std::string, DBStatus> result;
__anon52917de10402null399     std::thread subThread([]{
400         g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true);
401     });
402     status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_PULL, result);
403     subThread.join();
404     g_communicatorAggregator->RegOnDispatch(nullptr);
405 
406     EXPECT_TRUE(status == OK);
407     ASSERT_TRUE(result.size() == devices.size());
408     EXPECT_TRUE(result[DEVICE_B] == OK);
409     Value actualValue;
410     g_kvDelegatePtr->Get(key, actualValue);
411     EXPECT_EQ(actualValue, value);
412 }
413 
414 /**
415   * @tc.name: SametimeSync002
416   * @tc.desc: Test 2 device sync with each other with water error
417   * @tc.type: FUNC
418   * @tc.require: AR000CCPOM
419   * @tc.author: zhangqiquan
420   */
421 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync002, TestSize.Level3)
422 {
423     DBStatus status = OK;
424     std::vector<std::string> devices;
425     devices.push_back(g_deviceB->GetDeviceId());
426     g_kvDelegatePtr->Put({'k', '1'}, {'v', '1'});
427     /**
428      * @tc.steps: step1. make sure deviceA push data failed and increase water mark
429      * @tc.expected: step1. deviceA push failed with timeout
430      */
__anon52917de10502(const std::string &target, DistributedDB::Message *msg) 431     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
432         ASSERT_NE(msg, nullptr);
433         if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
434             msg->SetMessageId(INVALID_MESSAGE_ID);
435         }
436     });
437     std::map<std::string, DBStatus> result;
__anon52917de10602(const std::map<std::string, DBStatus> &map) 438     auto callback = [&result](const std::map<std::string, DBStatus> &map) {
439         result = map;
440     };
441     Query query = Query::Select().PrefixKey({'k', '1'});
442     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
443     ASSERT_TRUE(result.size() == devices.size());
444     EXPECT_TRUE(result[DEVICE_B] == TIME_OUT);
445     /**
446      * @tc.steps: step2. A push to B with query2, sleep 1s for waiting step3
447      * @tc.expected: step2. sync should return OK.
448      */
__anon52917de10702(const std::string &target, DistributedDB::Message *msg) 449     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
450         ASSERT_NE(msg, nullptr);
451         if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
452             std::this_thread::sleep_for(std::chrono::seconds(1));
453         }
454     });
__anon52917de10802null455     std::thread subThread([&devices] {
456         std::map<std::string, DBStatus> result;
457         auto callback = [&result](const std::map<std::string, DBStatus> &map) {
458             result = map;
459         };
460         Query query = Query::Select().PrefixKey({'k', '2'});
461         LOGD("Begin PUSH");
462         EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
463         ASSERT_TRUE(result.size() == devices.size());
464         EXPECT_TRUE(result[DEVICE_B] == OK);
465     });
466     /**
467      * @tc.steps: step3. B pull to A when A is in push task
468      * @tc.expected: step3. sync should return OP_FINISHED_ALL.
469      */
470     std::this_thread::sleep_for(std::chrono::milliseconds(100));
471     std::map<std::string, int> virtualResult;
472     g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, query,
__anon52917de10a02(const std::map<std::string, int> &map) 473         [&virtualResult](const std::map<std::string, int> &map) {
474             virtualResult = map;
475         }, true);
476     EXPECT_TRUE(status == OK);
477     ASSERT_EQ(virtualResult.size(), devices.size());
478     EXPECT_EQ(virtualResult[DEVICE_A], SyncOperation::OP_FINISHED_ALL);
479     g_communicatorAggregator->RegOnDispatch(nullptr);
480     subThread.join();
481 }
482 
483 /**
484  * @tc.name: DatabaseOnlineCallback001
485  * @tc.desc: check database status notify online callback
486  * @tc.type: FUNC
487  * @tc.require: AR000CQS3S SR000CQE0B
488  * @tc.author: zhuwentao
489  */
490 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOnlineCallback001, TestSize.Level1)
491 {
492     /**
493      * @tc.steps: step1. SetStoreStatusNotifier
494      * @tc.expected: step1. SetStoreStatusNotifier ok
495      */
496     std::string targetDev = "DEVICE_X";
497     bool isCheckOk = false;
498     auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anon52917de10b02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 499         const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
500         if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
501             onlineStatus == true) {
502             isCheckOk = true;
503         }};
504     g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
505     /**
506      * @tc.steps: step2. trigger device online
507      * @tc.expected: step2. check callback ok
508      */
509     g_communicatorAggregator->OnlineDevice(targetDev);
510     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
511     EXPECT_EQ(isCheckOk, true);
512     StoreStatusNotifier nullCallback;
513     g_mgr.SetStoreStatusNotifier(nullCallback);
514 }
515 
516 /**
517  * @tc.name: DatabaseOfflineCallback001
518  * @tc.desc: check database status notify online callback
519  * @tc.type: FUNC
520  * @tc.require: AR000CQS3S SR000CQE0B
521  * @tc.author: zhuwentao
522  */
523 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOfflineCallback001, TestSize.Level1)
524 {
525     /**
526      * @tc.steps: step1. SetStoreStatusNotifier
527      * @tc.expected: step1. SetStoreStatusNotifier ok
528      */
529     std::string targetDev = "DEVICE_X";
530     bool isCheckOk = false;
531     auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anon52917de10c02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 532         const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
533         if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
534             onlineStatus == false) {
535             isCheckOk = true;
536         }};
537     g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
538     /**
539      * @tc.steps: step2. trigger device offline
540      * @tc.expected: step2. check callback ok
541      */
542     g_communicatorAggregator->OfflineDevice(targetDev);
543     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
544     EXPECT_EQ(isCheckOk, true);
545     StoreStatusNotifier nullCallback;
546     g_mgr.SetStoreStatusNotifier(nullCallback);
547 }
548 
549 /**
550   * @tc.name: CloseSync001
551   * @tc.desc: Test 2 delegate close when sync
552   * @tc.type: FUNC
553   * @tc.require: AR000CCPOM
554   * @tc.author: zhangqiquan
555   */
556 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync001, TestSize.Level3)
557 {
558     DBStatus status = OK;
559     std::vector<std::string> devices;
560     devices.push_back(g_deviceB->GetDeviceId());
561 
562     /**
563      * @tc.steps: step1. make sure A sync start
564      */
565     bool sleep = false;
__anon52917de10d02(const std::string &target, DistributedDB::Message *msg) 566     g_communicatorAggregator->RegOnDispatch([&sleep](const std::string &target, DistributedDB::Message *msg) {
567         if (!sleep) {
568             sleep = true;
569             std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s for waiting close db
570         }
571     });
572 
573     KvStoreNbDelegate* kvDelegatePtrA = nullptr;
574     KvStoreNbDelegate::Option option;
__anon52917de10e02(DBStatus s, KvStoreNbDelegate *delegate) 575     g_mgr.GetKvStore(STORE_ID, option, [&status, &kvDelegatePtrA](DBStatus s, KvStoreNbDelegate *delegate) {
576         status = s;
577         kvDelegatePtrA = delegate;
578     });
579     EXPECT_EQ(status, OK);
580     EXPECT_NE(kvDelegatePtrA, nullptr);
581 
582     Key key = {'k'};
583     Value value = {'v'};
584     kvDelegatePtrA->Put(key, value);
585     std::map<std::string, DBStatus> result;
__anon52917de10f02(const std::map<std::string, DBStatus>& statusMap) 586     auto callback = [&result](const std::map<std::string, DBStatus>& statusMap) {
587         result = statusMap;
588     };
589     /**
590      * @tc.steps: step2. deviceA sync and then close
591      * @tc.expected: step2. sync should abort and data don't exist in B
592      */
__anon52917de11002() 593     std::thread closeThread([&kvDelegatePtrA]() {
594         std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for waiting sync start
595         EXPECT_EQ(g_mgr.CloseKvStore(kvDelegatePtrA), OK);
596     });
597     EXPECT_EQ(kvDelegatePtrA->Sync(devices, SYNC_MODE_PUSH_ONLY, callback, false), OK);
598     LOGD("Sync finish");
599     closeThread.join();
600     std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s for waiting sync finish
601     EXPECT_EQ(result.size(), 0u);
602     VirtualDataItem actualValue;
603     EXPECT_EQ(g_deviceB->GetData(key, actualValue), -E_NOT_FOUND);
604     g_communicatorAggregator->RegOnDispatch(nullptr);
605 }
606 
607 /**
608   * @tc.name: CloseSync002
609   * @tc.desc: Test 1 delegate close when in time sync
610   * @tc.type: FUNC
611   * @tc.require: AR000CCPOM
612   * @tc.author: zhangqiquan
613   */
614 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync002, TestSize.Level3)
615 {
616     /**
617      * @tc.steps: step1. invalid time sync packet from A
618      */
__anon52917de11102(const std::string &target, DistributedDB::Message *msg) 619     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
620         ASSERT_NE(msg, nullptr);
621         if (target == DEVICE_B && msg->GetMessageId() == TIME_SYNC_MESSAGE && msg->GetMessageType() == TYPE_REQUEST) {
622             msg->SetMessageId(INVALID_MESSAGE_ID);
623             LOGD("Message is invalid");
624         }
625     });
626     Timestamp currentTime;
627     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
628     g_deviceB->PutData({'k'}, {'v'}, currentTime, 0);
629 
630     /**
631      * @tc.steps: step2. B PUSH to A and A close after 1s
632      * @tc.expected: step2. A closing time cost letter than 4s
633      */
__anon52917de11202() 634     std::thread closingThread([]() {
635         std::this_thread::sleep_for(std::chrono::seconds(1));
636         LOGD("Begin Close");
637         Timestamp beginTime;
638         (void)OS::GetCurrentSysTimeInMicrosecond(beginTime);
639         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
640         Timestamp endTime;
641         (void)OS::GetCurrentSysTimeInMicrosecond(endTime);
642         EXPECT_LE(static_cast<int>(endTime - beginTime), 4 * 1000 * 1000); // waiting 4 * 1000 * 1000 us
643         LOGD("End Close");
644     });
645     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
646     closingThread.join();
647 
648     /**
649      * @tc.steps: step3. remove db
650      * @tc.expected: step3. remove ok
651      */
652     g_kvDelegatePtr = nullptr;
653     DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
654     LOGD("delete kv store status %d", status);
655     ASSERT_TRUE(status == OK);
656     g_communicatorAggregator->RegOnDispatch(nullptr);
657 }
658 
659 /**
660   * @tc.name: OrderbyWriteTimeSync001
661   * @tc.desc: sync query with order by writeTime
662   * @tc.type: FUNC
663   * @tc.require: AR000H5VLO
664   * @tc.author: zhuwentao
665   */
666 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, OrderbyWriteTimeSync001, TestSize.Level0)
667 {
668     /**
669      * @tc.steps: step1. deviceA subscribe query with order by write time
670      * * @tc.expected: step1. interface return not support
671     */
672     std::vector<std::string> devices;
673     devices.push_back(g_deviceB->GetDeviceId());
674     Query query = Query::Select().PrefixKey({'k'}).OrderByWriteTime(true);
675     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, nullptr, query, true), NOT_SUPPORT);
676 }
677 
678 
679 /**
680  * @tc.name: Device Offline Sync 001
681  * @tc.desc: Test push sync when device offline
682  * @tc.type: FUNC
683  * @tc.require: AR000CCPOM
684  * @tc.author: xushaohua
685  */
686 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync001, TestSize.Level1)
687 {
688     std::vector<std::string> devices;
689     devices.push_back(g_deviceB->GetDeviceId());
690     devices.push_back(g_deviceC->GetDeviceId());
691 
692     /**
693      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2}, {k3 delete}, {k4,v2}
694      */
695     Key key1 = {'1'};
696     Value value1 = {'1'};
697     ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value1) == OK);
698 
699     Key key2 = {'2'};
700     Value value2 = {'2'};
701     ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value2) == OK);
702 
703     Key key3 = {'3'};
704     Value value3 = {'3'};
705     ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value3) == OK);
706     ASSERT_TRUE(g_kvDelegatePtr->Delete(key3) == OK);
707 
708     Key key4 = {'4'};
709     Value value4 = {'4'};
710     ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value4) == OK);
711 
712     /**
713      * @tc.steps: step2. deviceB offline
714      */
715     g_deviceB->Offline();
716 
717     /**
718      * @tc.steps: step3. deviceA call pull sync
719      * @tc.expected: step3. sync should return OK.
720      */
721     std::map<std::string, DBStatus> result;
722     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
723     ASSERT_TRUE(status == OK);
724 
725     /**
726      * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
727      *     deviceC has {k1, v1}, {k2, v2}, {k3 delete}, {k4,v4}
728      */
729     for (const auto &pair : result) {
730         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
731         if (pair.first == DEVICE_B) {
732             // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
733             // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
734             // The returned status is COMM_FAILURE
735             EXPECT_TRUE((pair.second == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
736                 (pair.second == COMM_FAILURE));
737         } else {
738             EXPECT_EQ(pair.second, OK);
739         }
740     }
741     VirtualDataItem item;
742     g_deviceC->GetData(key1, item);
743     EXPECT_TRUE(item.value == value1);
744     item.value.clear();
745     g_deviceC->GetData(key2, item);
746     EXPECT_TRUE(item.value == value2);
747     item.value.clear();
748     Key hashKey;
749     DistributedDBToolsUnitTest::CalcHash(key3, hashKey);
750     EXPECT_TRUE(g_deviceC->GetData(hashKey, item) == -E_NOT_FOUND);
751     item.value.clear();
752     g_deviceC->GetData(key4, item);
753     EXPECT_TRUE(item.value == value4);
754 }
755 
756 /**
757  * @tc.name: Device Offline Sync 002
758  * @tc.desc: Test pull sync when device offline
759  * @tc.type: FUNC
760  * @tc.require: AR000CCPOM
761  * @tc.author: xushaohua
762  */
763 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync002, TestSize.Level1)
764 {
765     std::vector<std::string> devices;
766     devices.push_back(g_deviceB->GetDeviceId());
767     devices.push_back(g_deviceC->GetDeviceId());
768 
769     /**
770      * @tc.steps: step1. deviceB put {k1, v1}
771      */
772     Key key1 = {'1'};
773     Value value1 = {'1'};
774     g_deviceB->PutData(key1, value1, 0, 0);
775 
776     /**
777      * @tc.steps: step2. deviceB offline
778      */
779     g_deviceB->Offline();
780 
781     /**
782      * @tc.steps: step3. deviceC put {k2, v2}, {k3, delete}, {k4, v4}
783      */
784     Key key2 = {'2'};
785     Value value2 = {'2'};
786     g_deviceC->PutData(key2, value2, 0, 0);
787 
788     Key key3 = {'3'};
789     Value value3 = {'3'};
790     g_deviceC->PutData(key3, value3, 0, 1);
791 
792     Key key4 = {'4'};
793     Value value4 = {'4'};
794     g_deviceC->PutData(key4, value4, 0, 0);
795 
796     /**
797      * @tc.steps: step2. deviceA call pull sync
798      * @tc.expected: step2. sync should return OK.
799      */
800     std::map<std::string, DBStatus> result;
801     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
802     ASSERT_TRUE(status == OK);
803 
804     /**
805      * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
806      *     deviceA has {k2, v2}, {k3 delete}, {k4,v4}
807      */
808     for (const auto &pair : result) {
809         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
810         if (pair.first == DEVICE_B) {
811             // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
812             // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
813             // The returned status is COMM_FAILURE
814             EXPECT_TRUE((pair.second == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
815                 (pair.second == COMM_FAILURE));
816         } else {
817             EXPECT_EQ(pair.second, OK);
818         }
819     }
820 
821     Value value5;
822     EXPECT_TRUE(g_kvDelegatePtr->Get(key1, value5) != OK);
823     g_kvDelegatePtr->Get(key2, value5);
824     EXPECT_EQ(value5, value2);
825     EXPECT_TRUE(g_kvDelegatePtr->Get(key3, value5) != OK);
826     g_kvDelegatePtr->Get(key4, value5);
827     EXPECT_EQ(value5, value4);
828 }
829 
830 /**
831   * @tc.name: EncryptedAlgoUpgrade001
832   * @tc.desc: Test upgrade encrypted db can sync normally
833   * @tc.type: FUNC
834   * @tc.require: AR000HI2JS
835   * @tc.author: zhuwentao
836   */
837 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, EncryptedAlgoUpgrade001, TestSize.Level3)
838 {
839     /**
840      * @tc.steps: step1. clear db
841      * * @tc.expected: step1. interface return ok
842     */
843     if (g_kvDelegatePtr != nullptr) {
844         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
845         g_kvDelegatePtr = nullptr;
846         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
847         LOGD("delete kv store status %d", status);
848         ASSERT_TRUE(status == OK);
849     }
850 
851     CipherPassword passwd;
852     std::vector<uint8_t> passwdVect = {'p', 's', 'd', '1'};
853     passwd.SetValue(passwdVect.data(), passwdVect.size());
854     /**
855      * @tc.steps: step2. open old db by sql
856      * * @tc.expected: step2. interface return ok
857     */
858     std::string identifier = DBCommon::GenerateIdentifierId(STORE_ID, APP_ID, USER_ID);
859     std::string hashDir = DBCommon::TransferHashString(identifier);
860     std::string hexHashDir = DBCommon::TransferStringToHex(hashDir);
861     std::string dbPath = g_testDir + "/" + hexHashDir + "/single_ver";
862     ASSERT_TRUE(DBCommon::CreateDirectory(g_testDir + "/" + hexHashDir) == E_OK);
863     ASSERT_TRUE(DBCommon::CreateDirectory(dbPath) == E_OK);
864     std::vector<std::string> dbDir {DBConstant::MAINDB_DIR, DBConstant::METADB_DIR, DBConstant::CACHEDB_DIR};
865     for (const auto &item : dbDir) {
866         ASSERT_TRUE(DBCommon::CreateDirectory(dbPath + "/" + item) == E_OK);
867     }
868     uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
869     sqlite3 *db;
870     std::string fileUrl = dbPath + "/" + DBConstant::MAINDB_DIR + "/" + DBConstant::SINGLE_VER_DATA_STORE + ".db";
871     ASSERT_TRUE(sqlite3_open_v2(fileUrl.c_str(), &db, flag, nullptr) == SQLITE_OK);
872     SQLiteUtils::SetKeyInner(db, CipherType::AES_256_GCM, passwd, DBConstant::DEFAULT_ITER_TIMES);
873     /**
874      * @tc.steps: step3. create table and close
875      * * @tc.expected: step3. interface return ok
876     */
877     ASSERT_TRUE(SQLiteUtils::ExecuteRawSQL(db, CREATE_SYNC_TABLE_SQL) == E_OK);
878     sqlite3_close_v2(db);
879     db = nullptr;
880     LOGI("create old db success");
881     /**
882      * @tc.steps: step4. get kvstore
883      * * @tc.expected: step4. interface return ok
884     */
885     KvStoreNbDelegate::Option option;
886     option.isEncryptedDb = true;
887     option.cipher = CipherType::AES_256_GCM;
888     option.passwd = passwd;
889     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
890     ASSERT_TRUE(g_kvDelegateStatus == OK);
891     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
892     /**
893      * @tc.steps: step5. sync ok
894      * * @tc.expected: step5. interface return ok
895     */
896     PullSyncTest();
897     /**
898      * @tc.steps: step5. crud ok
899      * * @tc.expected: step5. interface return ok
900     */
901     CrudTest();
902 }
903 
904 /**
905   * @tc.name: RemoveDeviceData002
906   * @tc.desc: test remove device data before sync
907   * @tc.type: FUNC
908   * @tc.require:
909   * @tc.author: zhuwentao
910   */
911 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData002, TestSize.Level1)
912 {
913     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
914     /**
915      * @tc.steps: step1. sync deviceB data to A and check data
916      * * @tc.expected: step1. interface return ok
917     */
918     Key key1 = {'1'};
919     Key key2 = {'2'};
920     Value value = {'1'};
921     Timestamp currentTime;
922     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
923     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
924     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
925     EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
926     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
927     Value actualValue;
928     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
929     EXPECT_EQ(actualValue, value);
930     actualValue.clear();
931     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
932     EXPECT_EQ(actualValue, value);
933     /**
934      * @tc.steps: step2. call RemoveDeviceData
935      * * @tc.expected: step2. interface return ok
936     */
937     g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
938     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
939     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
940     /**
941      * @tc.steps: step3. sync to device A again and check data
942      * * @tc.expected: step3. sync ok
943     */
944     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
945     actualValue.clear();
946     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
947     EXPECT_EQ(actualValue, value);
948     actualValue.clear();
949     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
950     EXPECT_EQ(actualValue, value);
951 }
952 
953 /**
954   * @tc.name: DataSync001
955   * @tc.desc: Test Data Sync when Initialize
956   * @tc.type: FUNC
957   * @tc.require: AR000HI2JS
958   * @tc.author: zhuwentao
959   */
960 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync001, TestSize.Level1)
961 {
962     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
963     ASSERT_TRUE(dataSync != nullptr);
964     std::shared_ptr<Metadata> inMetadata = nullptr;
965     std::string deviceId;
966     Message message;
967     VirtualSingleVerSyncDBInterface tmpInterface;
968     VirtualCommunicator tmpCommunicator(deviceId, g_communicatorAggregator);
969     EXPECT_EQ(dataSync->Initialize(nullptr, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
970     EXPECT_EQ(dataSync->Initialize(&tmpInterface, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
971     EXPECT_EQ(dataSync->Initialize(&tmpInterface, &tmpCommunicator, inMetadata, deviceId), -E_INVALID_ARGS);
972     delete dataSync;
973 }
974 
975 /**
976   * @tc.name: DataSync002
977   * @tc.desc: Test active sync with invalid param in DataSync Class
978   * @tc.type: FUNC
979   * @tc.require: AR000HI2JS
980   * @tc.author: zhuwentao
981   */
982 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync002, TestSize.Level1)
983 {
984     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
985     ASSERT_TRUE(dataSync != nullptr);
986     Message message;
987     EXPECT_EQ(dataSync->TryContinueSync(nullptr, &message), -E_INVALID_ARGS);
988     EXPECT_EQ(dataSync->TryContinueSync(nullptr, nullptr), -E_INVALID_ARGS);
989     EXPECT_EQ(dataSync->PushStart(nullptr), -E_INVALID_ARGS);
990     EXPECT_EQ(dataSync->PushPullStart(nullptr), -E_INVALID_ARGS);
991     EXPECT_EQ(dataSync->PullRequestStart(nullptr), -E_INVALID_ARGS);
992     EXPECT_EQ(dataSync->PullResponseStart(nullptr), -E_INVALID_ARGS);
993     delete dataSync;
994 }
995 
996 /**
997   * @tc.name: DataSync003
998   * @tc.desc: Test receive invalid request data packet in DataSync Class
999   * @tc.type: FUNC
1000   * @tc.require: AR000HI2JS
1001   * @tc.author: zhuwentao
1002   */
1003 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync003, TestSize.Level1)
1004 {
1005     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1006     ASSERT_TRUE(dataSync != nullptr);
1007     uint64_t tmpMark = 0;
1008     Message message;
1009     EXPECT_EQ(dataSync->DataRequestRecv(nullptr, nullptr, tmpMark), -E_INVALID_ARGS);
1010     EXPECT_EQ(dataSync->DataRequestRecv(nullptr, &message, tmpMark), -E_INVALID_ARGS);
1011     delete dataSync;
1012 }
1013 
1014 /**
1015   * @tc.name: DataSync004
1016   * @tc.desc: Test receive invalid ack packet in DataSync Class
1017   * @tc.type: FUNC
1018   * @tc.require: AR000HI2JS
1019   * @tc.author: zhuwentao
1020   */
1021 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync004, TestSize.Level1)
1022 {
1023     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1024     ASSERT_TRUE(dataSync != nullptr);
1025     Message message;
1026     TestSingleVerKvSyncTaskContext tmpContext;
1027     EXPECT_EQ(dataSync->AckPacketIdCheck(nullptr), false);
1028     EXPECT_EQ(dataSync->AckPacketIdCheck(&message), false);
1029     EXPECT_EQ(dataSync->AckRecv(&tmpContext, nullptr), -E_INVALID_ARGS);
1030     EXPECT_EQ(dataSync->AckRecv(nullptr, nullptr), -E_INVALID_ARGS);
1031     EXPECT_EQ(dataSync->AckRecv(nullptr, &message), -E_INVALID_ARGS);
1032     delete dataSync;
1033 }
1034 
1035 /**
1036   * @tc.name: DataSync005
1037   * @tc.desc: Test receive invalid notify packet in DataSync Class
1038   * @tc.type: FUNC
1039   * @tc.require: AR000HI2JS
1040   * @tc.author: zhuwentao
1041   */
1042 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync005, TestSize.Level1)
1043 {
1044     ASSERT_NO_FATAL_FAILURE(DataSync005());
1045 }
1046 
1047 /**
1048   * @tc.name: DataSync006
1049   * @tc.desc: Test control start with invalid param in DataSync Class
1050   * @tc.type: FUNC
1051   * @tc.require: AR000HI2JS
1052   * @tc.author: zhuwentao
1053   */
1054 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync006, TestSize.Level1)
1055 {
1056     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1057     ASSERT_TRUE(dataSync != nullptr);
1058     TestSingleVerKvSyncTaskContext tmpContext;
1059     EXPECT_EQ(dataSync->ControlCmdStart(nullptr), -E_INVALID_ARGS);
1060     EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1061     std::shared_ptr<SubscribeManager> subManager = std::make_shared<SubscribeManager>();
1062     tmpContext.SetSubscribeManager(subManager);
1063     tmpContext.SetMode(SyncModeType::INVALID_MODE);
1064     EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1065     std::set<Key> Keys = {{'a'}, {'b'}};
1066     Query query = Query::Select().InKeys(Keys);
1067     QuerySyncObject innerQuery(query);
1068     tmpContext.SetQuery(innerQuery);
1069     tmpContext.SetMode(SyncModeType::SUBSCRIBE_QUERY);
1070     EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_NOT_SUPPORT);
1071     delete dataSync;
1072     subManager = nullptr;
1073 }
1074 
1075 /**
1076   * @tc.name: DataSync007
1077   * @tc.desc: Test receive invalid control packet in DataSync Class
1078   * @tc.type: FUNC
1079   * @tc.require: AR000HI2JS
1080   * @tc.author: zhuwentao
1081   */
1082 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync007, TestSize.Level1)
1083 {
1084     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1085     ASSERT_TRUE(dataSync != nullptr);
1086     Message message;
1087     ControlRequestPacket packet;
1088     TestSingleVerKvSyncTaskContext tmpContext;
1089     EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1090     message.SetCopiedObject(packet);
1091     EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1092     delete dataSync;
1093 }
1094 
1095 /**
1096   * @tc.name: DataSync008
1097   * @tc.desc: Test pull null msg in dataQueue in DataSync Class
1098   * @tc.type: FUNC
1099   * @tc.require: AR000HI2JS
1100   * @tc.author: zhuwentao
1101   */
1102 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync008, TestSize.Level1)
1103 {
1104     ASSERT_NO_FATAL_FAILURE(DataSync008());
1105 }
1106 
1107 /**
1108  * @tc.name: SyncRetry001
1109  * @tc.desc: use sync retry sync use push
1110  * @tc.type: FUNC
1111  * @tc.require: AR000CKRTD AR000CQE0E
1112  * @tc.author: zhuwentao
1113  */
1114 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry001, TestSize.Level3)
1115 {
1116     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1117     std::vector<std::string> devices;
1118     devices.push_back(g_deviceB->GetDeviceId());
1119 
1120     /**
1121      * @tc.steps: step1. set sync retry
1122      * @tc.expected: step1, Pragma return OK.
1123      */
1124     int pragmaData = 1;
1125     PragmaData input = static_cast<PragmaData>(&pragmaData);
1126     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1127 
1128     /**
1129      * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1130      */
1131     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1132 
1133     /**
1134      * @tc.steps: step3. deviceA call sync and wait
1135      * @tc.expected: step3. sync should return OK.
1136      */
1137     std::map<std::string, DBStatus> result;
1138     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1139 
1140     /**
1141      * @tc.expected: step4. onComplete should be called, and status is time_out
1142      */
1143     ASSERT_TRUE(result.size() == devices.size());
1144     for (const auto &pair : result) {
1145         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1146         EXPECT_TRUE(pair.second == OK);
1147     }
1148     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1149 }
1150 
1151 /**
1152  * @tc.name: SyncRetry002
1153  * @tc.desc: use sync retry sync use pull
1154  * @tc.type: FUNC
1155  * @tc.require: AR000CKRTD AR000CQE0E
1156  * @tc.author: zhuwentao
1157  */
1158 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry002, TestSize.Level3)
1159 {
1160     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE, 4u);
1161     std::vector<std::string> devices;
1162     devices.push_back(g_deviceB->GetDeviceId());
1163 
1164     /**
1165      * @tc.steps: step1. set sync retry
1166      * @tc.expected: step1, Pragma return OK.
1167      */
1168     int pragmaData = 1;
1169     PragmaData input = static_cast<PragmaData>(&pragmaData);
1170     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1171 
1172     /**
1173      * @tc.steps: step2. deviceA call sync and wait
1174      * @tc.expected: step2. sync should return OK.
1175      */
1176     std::map<std::string, DBStatus> result;
1177     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1178 
1179     /**
1180      * @tc.expected: step3. onComplete should be called, and status is time_out
1181      */
1182     ASSERT_TRUE(result.size() == devices.size());
1183     for (const auto &pair : result) {
1184         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1185         EXPECT_TRUE(pair.second == TIME_OUT);
1186     }
1187     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1188 }
1189 
1190 /**
1191  * @tc.name: SyncRetry003
1192  * @tc.desc: use sync retry sync use push by compress
1193  * @tc.type: FUNC
1194  * @tc.require: AR000CKRTD AR000CQE0E
1195  * @tc.author: zhuwentao
1196  */
1197 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry003, TestSize.Level3)
1198 {
1199     if (g_kvDelegatePtr != nullptr) {
1200         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1201         g_kvDelegatePtr = nullptr;
1202     }
1203     /**
1204      * @tc.steps: step1. open db use Compress
1205      * @tc.expected: step1, Pragma return OK.
1206      */
1207     KvStoreNbDelegate::Option option;
1208     option.isNeedCompressOnSync = true;
1209     option.compressionRate = 70;
1210     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1211     ASSERT_TRUE(g_kvDelegateStatus == OK);
1212     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1213 
1214     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1215     std::vector<std::string> devices;
1216     devices.push_back(g_deviceB->GetDeviceId());
1217 
1218     /**
1219      * @tc.steps: step2. set sync retry
1220      * @tc.expected: step2, Pragma return OK.
1221      */
1222     int pragmaData = 1;
1223     PragmaData input = static_cast<PragmaData>(&pragmaData);
1224     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1225 
1226     /**
1227      * @tc.steps: step3. deviceA put {k1, v1}, {k2, v2}
1228      */
1229     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1230 
1231     /**
1232      * @tc.steps: step4. deviceA call sync and wait
1233      * @tc.expected: step4. sync should return OK.
1234      */
1235     std::map<std::string, DBStatus> result;
1236     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1237 
1238     /**
1239      * @tc.expected: step5. onComplete should be called, and status is time_out
1240      */
1241     ASSERT_TRUE(result.size() == devices.size());
1242     for (const auto &pair : result) {
1243         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1244         EXPECT_TRUE(pair.second == OK);
1245     }
1246     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1247 }
1248 
1249 /**
1250  * @tc.name: SyncRetry004
1251  * @tc.desc: use query sync retry sync use push
1252  * @tc.type: FUNC
1253  * @tc.require: AR000CKRTD AR000CQE0E
1254  * @tc.author: zhuwentao
1255  */
1256 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry004, TestSize.Level3)
1257 {
1258     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1259     std::vector<std::string> devices;
1260     devices.push_back(g_deviceB->GetDeviceId());
1261 
1262     /**
1263      * @tc.steps: step1. set sync retry
1264      * @tc.expected: step1, Pragma return OK.
1265      */
1266     int pragmaData = 1;
1267     PragmaData input = static_cast<PragmaData>(&pragmaData);
1268     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1269 
1270     /**
1271      * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1272      */
1273     for (int i = 0; i < 5; i++) {
1274         Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 128); // rand num 1024 for test
1275         Value value;
1276         DistributedDBToolsUnitTest::GetRandomKeyValue(value, 256u);
1277         EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1278     }
1279 
1280     /**
1281      * @tc.steps: step3. deviceA call sync and wait
1282      * @tc.expected: step3. sync should return OK.
1283      */
1284     std::map<std::string, DBStatus> result;
1285     std::vector<uint8_t> prefixKey({'a', 'b'});
1286     Query query = Query::Select().PrefixKey(prefixKey);
1287     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
1288 
1289     /**
1290      * @tc.expected: step4. onComplete should be called, and status is time_out
1291      */
1292     ASSERT_TRUE(result.size() == devices.size());
1293     for (const auto &pair : result) {
1294         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1295         EXPECT_TRUE(pair.second == OK);
1296     }
1297     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1298 }
1299 
1300 /**
1301  * @tc.name: SyncRetry005
1302  * @tc.desc: use sync retry sync use pull by compress
1303  * @tc.type: FUNC
1304  * @tc.require: AR000CKRTD AR000CQE0E
1305  * @tc.author: zhangqiquan
1306  */
1307 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry005, TestSize.Level3)
1308 {
1309     if (g_kvDelegatePtr != nullptr) {
1310         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1311         g_kvDelegatePtr = nullptr;
1312     }
1313     /**
1314      * @tc.steps: step1. open db use Compress
1315      * @tc.expected: step1, Pragma return OK.
1316      */
1317     KvStoreNbDelegate::Option option;
1318     option.isNeedCompressOnSync = true;
1319     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1320     ASSERT_TRUE(g_kvDelegateStatus == OK);
1321     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1322 
1323     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1324     std::vector<std::string> devices;
1325     devices.push_back(g_deviceB->GetDeviceId());
1326 
1327     /**
1328      * @tc.steps: step2. set sync retry
1329      * @tc.expected: step2, Pragma return OK.
1330      */
1331     int pragmaData = 1;
1332     PragmaData input = static_cast<PragmaData>(&pragmaData);
1333     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1334 
1335     /**
1336      * @tc.steps: step3. deviceA call sync and wait
1337      * @tc.expected: step3. sync should return OK.
1338      */
1339     std::map<std::string, DBStatus> result;
1340     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1341 
1342     /**
1343      * @tc.expected: step4. onComplete should be called, and status is time_out
1344      */
1345     ASSERT_TRUE(result.size() == devices.size());
1346     for (const auto &pair : result) {
1347         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1348         EXPECT_EQ(pair.second, OK);
1349     }
1350     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1351 }
1352 
1353 /**
1354  * @tc.name: ReSetWatchDogTest001
1355  * @tc.desc: trigger resetWatchDog while pull
1356  * @tc.type: FUNC
1357  * @tc.require: AR000CKRTD AR000CQE0E
1358  * @tc.author: zhuwentao
1359  */
1360 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, ReSetWaterDogTest001, TestSize.Level3)
1361 {
1362     ASSERT_NO_FATAL_FAILURE(ReSetWaterDogTest001());
1363 }
1364 
1365 /**
1366   * @tc.name: RebuildSync001
1367   * @tc.desc: rebuild db and sync again
1368   * @tc.type: FUNC
1369   * @tc.require:
1370   * @tc.author: zhuwentao
1371   */
1372 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync001, TestSize.Level3)
1373 {
1374     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1375     /**
1376      * @tc.steps: step1. sync deviceB data to A and check data
1377      * * @tc.expected: step1. interface return ok
1378     */
1379     Key key1 = {'1'};
1380     Key key2 = {'2'};
1381     Value value = {'1'};
1382     Timestamp currentTime;
1383     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1384     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1385     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1386     EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1387     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1388     std::this_thread::sleep_for(std::chrono::seconds(1));
1389 
1390     Value actualValue;
1391     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1392     EXPECT_EQ(actualValue, value);
1393     actualValue.clear();
1394     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1395     EXPECT_EQ(actualValue, value);
1396     /**
1397      * @tc.steps: step2. delete db and rebuild
1398      * * @tc.expected: step2. interface return ok
1399     */
1400     g_mgr.CloseKvStore(g_kvDelegatePtr);
1401     g_kvDelegatePtr = nullptr;
1402     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1403     KvStoreNbDelegate::Option option;
1404     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1405     ASSERT_TRUE(g_kvDelegateStatus == OK);
1406     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1407     /**
1408      * @tc.steps: step3. sync to device A again
1409      * * @tc.expected: step3. sync ok
1410     */
1411     value = {'2'};
1412     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1413     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1414     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1415     std::this_thread::sleep_for(std::chrono::seconds(1));
1416     /**
1417      * @tc.steps: step4. check data in device A
1418      * * @tc.expected: step4. check ok
1419     */
1420     actualValue.clear();
1421     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1422     EXPECT_EQ(actualValue, value);
1423 }
1424 
1425 /**
1426   * @tc.name: RebuildSync002
1427   * @tc.desc: test clear remote data when receive data
1428   * @tc.type: FUNC
1429   * @tc.require:
1430   * @tc.author: zhuwentao
1431   */
1432 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync002, TestSize.Level1)
1433 {
1434     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1435     std::vector<std::string> devices;
1436     devices.push_back(g_deviceB->GetDeviceId());
1437     /**
1438      * @tc.steps: step1. device A SET_WIPE_POLICY
1439      * * @tc.expected: step1. interface return ok
1440     */
1441     int pragmaData = 2; // 2 means enable
1442     PragmaData input = static_cast<PragmaData>(&pragmaData);
1443     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_WIPE_POLICY, input) == OK);
1444     /**
1445      * @tc.steps: step2. sync deviceB data to A and check data
1446      * * @tc.expected: step2. interface return ok
1447     */
1448     Key key1 = {'1'};
1449     Key key2 = {'2'};
1450     Key key3 = {'3'};
1451     Key key4 = {'4'};
1452     Value value = {'1'};
1453     Timestamp currentTime;
1454     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1455     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1456     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1457     EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1458     EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1459     /**
1460      * @tc.steps: step3. deviceA call pull sync
1461      * @tc.expected: step3. sync should return OK.
1462      */
1463     std::map<std::string, DBStatus> result;
1464     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result) == OK);
1465 
1466     /**
1467      * @tc.expected: step4. onComplete should be called, check data
1468      */
1469     ASSERT_TRUE(result.size() == devices.size());
1470     for (const auto &pair : result) {
1471         EXPECT_TRUE(pair.second == OK);
1472     }
1473     Value actualValue;
1474     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1475     EXPECT_EQ(actualValue, value);
1476     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1477     EXPECT_EQ(actualValue, value);
1478     /**
1479      * @tc.steps: step5. device B rebuild and put some data
1480      * * @tc.expected: step5. rebuild ok
1481     */
1482     if (g_deviceB != nullptr) {
1483         delete g_deviceB;
1484         g_deviceB = nullptr;
1485     }
1486     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
1487     ASSERT_TRUE(g_deviceB != nullptr);
1488     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
1489     ASSERT_TRUE(syncInterfaceB != nullptr);
1490     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
1491     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1492     EXPECT_EQ(g_deviceB->PutData(key3, value, currentTime, 0), E_OK);
1493     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1494     EXPECT_EQ(g_deviceB->PutData(key4, value, currentTime, 0), E_OK);
1495     /**
1496      * @tc.steps: step6. sync to device A again and check data
1497      * * @tc.expected: step6. sync ok
1498     */
1499     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1500     EXPECT_EQ(g_kvDelegatePtr->Get(key3, actualValue), OK);
1501     EXPECT_EQ(actualValue, value);
1502     EXPECT_EQ(g_kvDelegatePtr->Get(key4, actualValue), OK);
1503     EXPECT_EQ(actualValue, value);
1504     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1505     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
1506 }
1507 
1508 /**
1509   * @tc.name: RebuildSync003
1510   * @tc.desc: test clear history data when receive ack
1511   * @tc.type: FUNC
1512   * @tc.require:
1513   * @tc.author: zhuwentao
1514   */
1515 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync003, TestSize.Level1)
1516 {
1517     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1518     /**
1519      * @tc.steps: step1. sync deviceB data to A and check data
1520      * * @tc.expected: step1. interface return ok
1521     */
1522     Key key1 = {'1'};
1523     Key key2 = {'2'};
1524     Key key3 = {'3'};
1525     Key key4 = {'4'};
1526     Value value = {'1'};
1527     EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1528     EXPECT_EQ(g_deviceB->PutData(key2, value, 2u, 0), E_OK); // 2: timestamp
1529     EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1530     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1531     Value actualValue;
1532     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1533     EXPECT_EQ(actualValue, value);
1534     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1535     EXPECT_EQ(actualValue, value);
1536     VirtualDataItem item;
1537     EXPECT_EQ(g_deviceB->GetData(key3, item), E_OK);
1538     EXPECT_EQ(item.value, value);
1539     /**
1540      * @tc.steps: step2. device B sync to device A,but make it failed
1541      * * @tc.expected: step2. interface return ok
1542     */
1543     EXPECT_EQ(g_deviceB->PutData(key4, value, 3u, 0), E_OK); // 3: timestamp
1544     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_A, DATA_SYNC_MESSAGE);
1545     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1546     /**
1547      * @tc.steps: step3. device B set delay send time
1548      * * @tc.expected: step3. interface return ok
1549     */
1550     std::set<std::string> delayDevice = {DEVICE_B};
1551     g_communicatorAggregator->SetSendDelayInfo(3000u, DATA_SYNC_MESSAGE, 1u, 0u, delayDevice); // delay 3000ms one time
1552     /**
1553      * @tc.steps: step4. device A rebuilt, device B push data to A and set clear remote data mark into context after 1s
1554      * * @tc.expected: step4. interface return ok
1555     */
1556     g_deviceB->SetClearRemoteStaleData(true);
1557     g_mgr.CloseKvStore(g_kvDelegatePtr);
1558     g_kvDelegatePtr = nullptr;
1559     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1560     KvStoreNbDelegate::Option option;
1561     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1562     ASSERT_TRUE(g_kvDelegateStatus == OK);
1563     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1564     std::map<std::string, DBStatus> result;
1565     std::vector<std::string> devices = {g_deviceB->GetDeviceId()};
1566     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1567     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1568     /**
1569      * @tc.steps: step5. device B sync to A, make it clear history data and check data
1570      * * @tc.expected: step5. interface return ok
1571     */
1572     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1573     EXPECT_EQ(g_deviceB->GetData(key3, item), -E_NOT_FOUND);
1574     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1575     EXPECT_EQ(actualValue, value);
1576     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1577     EXPECT_EQ(actualValue, value);
1578     g_communicatorAggregator->ResetSendDelayInfo();
1579 }
1580 
1581 /**
1582   * @tc.name: RebuildSync004
1583   * @tc.desc: test WIPE_STALE_DATA mode when peers rebuilt db
1584   * @tc.type: FUNC
1585   * @tc.require:
1586   * @tc.author: zhangtao
1587   */
1588 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync004, TestSize.Level1)
1589 {
1590     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1591     /**
1592      * @tc.steps: step1. sync deviceB data to A and check data
1593      * * @tc.expected: step1. interface return ok
1594     */
1595     Key key1 = {'1'};
1596     Key key2 = {'2'};
1597     Key key3 = {'3'};
1598     Key key4 = {'4'};
1599     Value value = {'1'};
1600     EXPECT_EQ(g_kvDelegatePtr->Put(key1, value), OK);
1601     EXPECT_EQ(g_kvDelegatePtr->Put(key2, value), OK);
1602     EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1603     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1604     Value actualValue;
1605     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1606     EXPECT_EQ(actualValue, value);
1607     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1608     EXPECT_EQ(actualValue, value);
1609     EXPECT_EQ(g_kvDelegatePtr->Get(key3, actualValue), OK);
1610     EXPECT_EQ(actualValue, value);
1611     VirtualDataItem item;
1612     EXPECT_EQ(g_deviceB->GetData(key1, item), E_OK);
1613     EXPECT_EQ(item.value, value);
1614     EXPECT_EQ(g_deviceB->GetData(key2, item), E_OK);
1615     EXPECT_EQ(item.value, value);
1616     EXPECT_EQ(g_deviceB->GetData(key3, item), E_OK);
1617     EXPECT_EQ(item.value, value);
1618 
1619     /**
1620      * @tc.steps: step2. device A rebuilt, device B push data to A and set clear remote data mark into context after 1s
1621      * * @tc.expected: step2. interface return ok
1622     */
1623     g_deviceB->SetClearRemoteStaleData(true);
1624     EXPECT_EQ(g_deviceB->PutData(key4, value, 3u, 2), E_OK); // 3: timestamp
1625 
1626     VirtualDataItem item2;
1627     EXPECT_EQ(g_deviceB->GetData(key4, item2), E_OK);
1628     EXPECT_EQ(item2.value, value);
1629     g_mgr.CloseKvStore(g_kvDelegatePtr);
1630     g_kvDelegatePtr = nullptr;
1631     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1632     KvStoreNbDelegate::Option option;
1633     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1634     ASSERT_TRUE(g_kvDelegateStatus == OK);
1635     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1636 
1637     /**
1638      * @tc.steps: step3. device B sync to A, make it clear history data and check data
1639      * * @tc.expected: step3. interface return ok
1640     */
1641     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1642     EXPECT_EQ(g_deviceB->GetData(key2, item), -E_NOT_FOUND);
1643     EXPECT_EQ(g_deviceB->GetData(key3, item), -E_NOT_FOUND);
1644     EXPECT_EQ(g_deviceB->GetData(key4, item2), E_OK);
1645     EXPECT_EQ(item2.value, value);
1646     EXPECT_EQ(g_kvDelegatePtr->Get(key4, actualValue), OK);
1647     EXPECT_EQ(actualValue, value);
1648 }
1649 
1650 /**
1651   * @tc.name: RemoveDeviceData001
1652   * @tc.desc: call rekey and removeDeviceData Concurrently
1653   * @tc.type: FUNC
1654   * @tc.require: AR000D487B
1655   * @tc.author: zhuwentao
1656   */
1657 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData001, TestSize.Level1)
1658 {
1659     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1660     /**
1661      * @tc.steps: step1. sync deviceB data to A
1662      * * @tc.expected: step1. interface return ok
1663     */
1664     Key key1 = {'1'};
1665     Key key2 = {'2'};
1666     Value value = {'1'};
1667     g_deviceB->PutData(key1, value, 1, 0);
1668     g_deviceB->PutData(key2, value, 2, 0);
1669     g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true);
1670 
1671     Value actualValue;
1672     g_kvDelegatePtr->Get(key1, actualValue);
1673     EXPECT_EQ(actualValue, value);
1674     actualValue.clear();
1675     g_kvDelegatePtr->Get(key2, actualValue);
1676     EXPECT_EQ(actualValue, value);
1677     /**
1678      * @tc.steps: step2. call Rekey and RemoveDeviceData Concurrently
1679      * * @tc.expected: step2. interface return ok
1680     */
__anon52917de11302() 1681     std::thread thread1([]() {
1682         CipherPassword passwd3;
1683         std::vector<uint8_t> passwdVect = {'p', 's', 'd', 'z'};
1684         passwd3.SetValue(passwdVect.data(), passwdVect.size());
1685         g_kvDelegatePtr->Rekey(passwd3);
1686     });
__anon52917de11402() 1687     std::thread thread2([]() {
1688         g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
1689     });
1690     thread1.join();
1691     thread2.join();
1692 }
1693 
1694 /**
1695   * @tc.name: DeviceOfflineSyncTask001
1696   * @tc.desc: Test sync task when device offline and close db Concurrently
1697   * @tc.type: FUNC
1698   * @tc.require: AR000HI2JS
1699   * @tc.author: zhuwentao
1700   */
1701 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask001, TestSize.Level3)
1702 {
1703     DBStatus status = OK;
1704     std::vector<std::string> devices;
1705     devices.push_back(g_deviceB->GetDeviceId());
1706 
1707     /**
1708      * @tc.steps: step1. deviceA put {k1, v1}
1709      */
1710     Key key = {'1'};
1711     Value value = {'1'};
1712     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1713 
1714     /**
1715      * @tc.steps: step2. deviceA set auto sync and put some key/value
1716      * @tc.expected: step2. interface should return OK.
1717      */
1718     bool autoSync = true;
1719     PragmaData data = static_cast<PragmaData>(&autoSync);
1720     status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1721     ASSERT_EQ(status, OK);
1722 
1723     Key key1 = {'2'};
1724     Key key2 = {'3'};
1725     Key key3 = {'4'};
1726     Key key4 = {'5'};
1727     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1728     ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1729     ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1730     ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1731     ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value) == OK);
1732     /**
1733      * @tc.steps: step3. device offline and close db Concurrently
1734      * @tc.expected: step3. interface should return OK.
1735      */
__anon52917de11502() 1736     std::thread thread1([]() {
1737         g_mgr.CloseKvStore(g_kvDelegatePtr);
1738         g_kvDelegatePtr = nullptr;
1739     });
__anon52917de11602() 1740     std::thread thread2([]() {
1741         g_deviceB->Offline();
1742     });
1743     thread1.join();
1744     thread2.join();
1745     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1746     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1747 }
1748 
1749 /**
1750   * @tc.name: DeviceOfflineSyncTask002
1751   * @tc.desc: Test sync task when autoSync and close db Concurrently
1752   * @tc.type: FUNC
1753   * @tc.require:
1754   * @tc.author: zhuwentao
1755   */
1756 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask002, TestSize.Level3)
1757 {
1758     DBStatus status = OK;
1759     g_deviceC->Offline();
1760 
1761     /**
1762      * @tc.steps: step1. deviceA put {k1, v1}
1763      */
1764     Key key = {'1'};
1765     Value value = {'1'};
1766     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1767 
1768     /**
1769      * @tc.steps: step2. deviceA set auto sync and put some key/value
1770      * @tc.expected: step2. interface should return OK.
1771      */
1772     bool autoSync = true;
1773     PragmaData data = static_cast<PragmaData>(&autoSync);
1774     status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1775     ASSERT_EQ(status, OK);
1776     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME * 2));
1777 
1778     Key key1 = {'2'};
1779     Key key2 = {'3'};
1780     Key key3 = {'4'};
1781     ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1782     ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1783     ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1784     /**
1785      * @tc.steps: step3. close db
1786      * @tc.expected: step3. interface should return OK.
1787      */
1788     g_mgr.CloseKvStore(g_kvDelegatePtr);
1789     g_kvDelegatePtr = nullptr;
1790     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1791 }
1792 
1793 /**
1794   * @tc.name: DeviceOfflineSyncTask003
1795   * @tc.desc: Test sync task when device offline after call sync
1796   * @tc.type: FUNC
1797   * @tc.require:
1798   * @tc.author: zhuwentao
1799   */
1800 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask003, TestSize.Level3)
1801 {
1802     std::vector<std::string> devices;
1803     devices.push_back(g_deviceB->GetDeviceId());
1804 
1805     /**
1806      * @tc.steps: step1. deviceA put {k1, v1}
1807      */
1808     Key key = {'1'};
1809     Value value = {'1'};
1810     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1811     /**
1812      * @tc.steps: step2. device offline after call sync
1813      * @tc.expected: step2. interface should return OK.
1814      */
1815     Query query = Query::Select().PrefixKey(key);
1816     ASSERT_TRUE(g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, query, false) == OK);
1817     std::this_thread::sleep_for(std::chrono::milliseconds(15)); // wait for 15ms
1818     g_deviceB->Offline();
1819 }
1820 
1821 /**
1822   * @tc.name: GetSyncDataFail001
1823   * @tc.desc: test get sync data failed when sync
1824   * @tc.type: FUNC
1825   * @tc.require:
1826   * @tc.author: zhuwentao
1827   */
1828 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail001, TestSize.Level1)
1829 {
1830     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1831     /**
1832      * @tc.steps: step1. device B set get data errCode control and put some data
1833      * * @tc.expected: step1. interface return ok
1834     */
1835     g_deviceB->SetGetDataErrCode(1, -E_BUSY, true);
1836     Key key1 = {'1'};
1837     Value value = {'1'};
1838     EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1839     /**
1840      * @tc.steps: step2. device B sync to device A and check data
1841      * * @tc.expected: step2. interface return ok
1842     */
1843     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1844     Value actualValue;
1845     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1846     g_deviceB->ResetDataControl();
1847 }
1848 
1849 /**
1850   * @tc.name: GetSyncDataFail002
1851   * @tc.desc: test get sync data failed when sync with large data
1852   * @tc.type: FUNC
1853   * @tc.require: AR000D487B
1854   * @tc.author: zhuwentao
1855   */
1856 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail002, TestSize.Level1)
1857 {
1858     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1859     /**
1860      * @tc.steps: step1. device B set get data errCode control and put some data
1861      * * @tc.expected: step1. interface return ok
1862     */
1863     g_deviceB->SetGetDataErrCode(2, -E_BUSY, true);
1864     int totalSize = 4000u;
1865     std::vector<Entry> entries;
1866     std::vector<Key> keys;
1867     const int keyLen = 10; // 20 Bytes
1868     const int valueLen = 10; // 20 Bytes
1869     DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1870     uint32_t i = 1u;
1871     for (const auto &entry : entries) {
1872         EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1873         i++;
1874     }
1875     /**
1876      * @tc.steps: step2. device B sync to device A and check data
1877      * * @tc.expected: step2. interface return ok
1878     */
1879     std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
1880     std::map<std::string, DBStatus> result;
1881     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1882     Value actualValue;
1883     for (int j = 1u; j <= totalSize; j++) {
1884         if (j > totalSize / 2) {
1885             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1886         } else {
1887             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1888         }
1889     }
1890     g_deviceB->ResetDataControl();
1891 }
1892 
1893 /**
1894   * @tc.name: GetSyncDataFail003
1895   * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1896   * @tc.type: FUNC
1897   * @tc.require:
1898   * @tc.author: zhuwentao
1899   */
1900 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail003, TestSize.Level1)
1901 {
1902     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1903     /**
1904      * @tc.steps: step1. device B set get data errCode control and put some data
1905      * * @tc.expected: step1. interface return ok
1906     */
1907     g_deviceB->SetGetDataErrCode(1, -E_EKEYREVOKED, true);
1908     Key key1 = {'1'};
1909     Key key2 = {'3'};
1910     Value value = {'1'};
1911     EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1912     EXPECT_EQ(g_kvDelegatePtr->Put(key2, value), OK);
1913     /**
1914      * @tc.steps: step2. device B sync to device A and check data
1915      * * @tc.expected: step2. interface return ok
1916     */
1917     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1918     Value actualValue;
1919     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1920     VirtualDataItem item;
1921     EXPECT_EQ(g_deviceB->GetData(key2, item), E_OK);
1922     g_deviceB->ResetDataControl();
1923 }
1924 
1925 /**
1926   * @tc.name: GetSyncDataFail004
1927   * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1928   * @tc.type: FUNC
1929   * @tc.require:
1930   * @tc.author: zhuwentao
1931   */
1932 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail004, TestSize.Level1)
1933 {
1934     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1935     /**
1936      * @tc.steps: step1. device B set get data errCode control and put some data
1937      * * @tc.expected: step1. interface return ok
1938     */
1939     g_deviceB->SetGetDataErrCode(2, -E_EKEYREVOKED, true);
1940     int totalSize = 4000u;
1941     std::vector<Entry> entries;
1942     std::vector<Key> keys;
1943     const int keyLen = 10; // 20 Bytes
1944     const int valueLen = 10; // 20 Bytes
1945     DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1946     uint32_t i = 1u;
1947     for (const auto &entry : entries) {
1948         EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1949         i++;
1950     }
1951     Key key = {'a', 'b', 'c'};
1952     Value value = {'1'};
1953     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1954     /**
1955      * @tc.steps: step2. device B sync to device A and check data
1956      * * @tc.expected: step2. interface return ok
1957     */
1958     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1959     std::this_thread::sleep_for(std::chrono::seconds(1));
1960     Value actualValue;
1961     for (int j = 1u; j <= totalSize; j++) {
1962         if (j > totalSize / 2) {
1963             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1964         } else {
1965             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1966         }
1967     }
1968     VirtualDataItem item;
1969     EXPECT_EQ(g_deviceB->GetData(key, item), E_OK);
1970     g_deviceB->ResetDataControl();
1971 }
1972 
1973 /**
1974   * @tc.name: InterceptDataFail001
1975   * @tc.desc: test intercept data failed when sync
1976   * @tc.type: FUNC
1977   * @tc.require:
1978   * @tc.author: zhuwentao
1979   */
1980 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptDataFail001, TestSize.Level1)
1981 {
1982     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1983     /**
1984      * @tc.steps: step1. device A set intercept data errCode and put some data
1985      * * @tc.expected: step1. interface return ok
1986     */
1987     g_kvDelegatePtr->SetPushDataInterceptor(
__anon52917de11702(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 1988         [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
1989             int errCode = OK;
1990             auto entries = data.GetEntries();
1991             LOGD("====here111,size=%d", entries.size());
1992             for (size_t i = 0; i < entries.size(); i++) {
1993                 Key newKey;
1994                 errCode = data.ModifyKey(i, newKey);
1995                 if (errCode != OK) {
1996                     break;
1997                 }
1998             }
1999             return errCode;
2000         }
2001     );
2002     Key key = {'1'};
2003     Value value = {'1'};
2004     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
2005     /**
2006      * @tc.steps: step2. device A sync to device B and check data
2007      * * @tc.expected: step2. interface return ok
2008     */
2009     std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2010     std::map<std::string, DBStatus> result;
2011     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
2012     ASSERT_TRUE(result.size() == devices.size());
2013     for (const auto &pair : result) {
2014         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2015         EXPECT_TRUE(pair.second == INTERCEPT_DATA_FAIL);
2016     }
2017     VirtualDataItem item;
2018     EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
2019 }
2020 
2021 /**
2022   * @tc.name: InterceptDataFail002
2023   * @tc.desc: test intercept data failed when sync
2024   * @tc.type: FUNC
2025   * @tc.require:
2026   * @tc.author: zhangqiquan
2027   */
2028 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptDataFail002, TestSize.Level0)
2029 {
2030     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2031     /**
2032      * @tc.steps: step1. device A set intercept data errCode and B put some data
2033      * @tc.expected: step1. interface return ok
2034      */
2035     g_kvDelegatePtr->SetReceiveDataInterceptor(
__anon52917de11802(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 2036         [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
2037             auto entries = data.GetEntries();
2038             LOGD("====on receive,size=%d", entries.size());
2039             for (size_t i = 0; i < entries.size(); i++) {
2040                 Key newKey;
2041                 int errCode = data.ModifyKey(i, newKey);
2042                 if (errCode != OK) {
2043                     return errCode;
2044                 }
2045             }
2046             return E_OK;
2047         }
2048     );
2049     Key key = {'1'};
2050     Value value = {'1'};
2051     g_deviceB->PutData(key, value, 1u, 0); // 1 is timestamp
2052     /**
2053      * @tc.steps: step2. device A sync to device B and check data
2054      * @tc.expected: step2. interface return ok
2055      */
2056     std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2057     std::map<std::string, DBStatus> result;
2058     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
2059     ASSERT_TRUE(result.size() == devices.size());
2060     for (const auto &pair : result) {
2061         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2062         EXPECT_EQ(pair.second, INTERCEPT_DATA_FAIL);
2063     }
2064     Value actualValue;
2065     EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), NOT_FOUND);
2066 }
2067 
2068 /**
2069   * @tc.name: InterceptData001
2070   * @tc.desc: test intercept receive data when sync
2071   * @tc.type: FUNC
2072   * @tc.require:
2073   * @tc.author: zhangqiquan
2074   */
2075 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptData001, TestSize.Level0)
2076 {
2077     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2078     /**
2079      * @tc.steps: step1. device A set intercept data errCode and B put some data
2080      * @tc.expected: step1. interface return ok
2081      */
2082     g_kvDelegatePtr->SetReceiveDataInterceptor(
__anon52917de11902(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 2083         [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
2084             auto entries = data.GetEntries();
2085             LOGD("====on receive,size=%d", entries.size());
2086             for (size_t i = 0; i < entries.size(); i++) {
2087                 Key newKey = {'2'};
2088                 int errCode = data.ModifyKey(i, newKey);
2089                 if (errCode != OK) {
2090                     return errCode;
2091                 }
2092                 Value newValue = {'3'};
2093                 errCode = data.ModifyValue(i, newValue);
2094                 if (errCode != OK) {
2095                     return errCode;
2096                 }
2097             }
2098             return E_OK;
2099         }
2100     );
2101     Key key = {'1'};
2102     Value value = {'1'};
2103     g_deviceB->PutData(key, value, 1u, 0); // 1 is timestamp
2104     /**
2105      * @tc.steps: step2. device A sync to device B and check data
2106      * @tc.expected: step2. interface return ok
2107      */
2108     std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2109     std::map<std::string, DBStatus> result;
2110     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
2111     ASSERT_TRUE(result.size() == devices.size());
2112     for (const auto &pair : result) {
2113         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2114         EXPECT_EQ(pair.second, OK);
2115     }
2116     Value actualValue;
2117     EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), NOT_FOUND);
2118     key = {'2'};
2119     EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), OK);
2120     value = {'3'};
2121     EXPECT_EQ(actualValue, value);
2122 }
2123 
2124 /**
2125   * @tc.name: UpdateKey001
2126   * @tc.desc: test update key can effect local data and sync data, without delete data
2127   * @tc.type: FUNC
2128   * @tc.require:
2129   * @tc.author: zhangqiquan
2130   */
2131 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, UpdateKey001, TestSize.Level1)
2132 {
2133     /**
2134      * @tc.steps: step1. device A set sync data (k1, v1) local data (k2, v2) (k3, v3) and delete (k4, v4)
2135      * @tc.expected: step1. put data return ok
2136      */
2137     Key k1 = {'k', '1'};
2138     Value v1 = {'v', '1'};
2139     g_deviceB->PutData(k1, v1, 1, 0);
2140     ASSERT_EQ(g_deviceB->Sync(SyncMode::SYNC_MODE_PUSH_ONLY, true), E_OK);
2141     Value actualValue;
2142     EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
2143     EXPECT_EQ(v1, actualValue);
2144     Key k2 = {'k', '2'};
2145     Value v2 = {'v', '2'};
2146     Key k3 = {'k', '3'};
2147     Value v3 = {'v', '3'};
2148     Key k4 = {'k', '4'};
2149     Value v4 = {'v', '4'};
2150     EXPECT_EQ(g_kvDelegatePtr->Put(k2, v2), OK);
2151     EXPECT_EQ(g_kvDelegatePtr->Put(k3, v3), OK);
2152     EXPECT_EQ(g_kvDelegatePtr->Put(k4, v4), OK);
2153     EXPECT_EQ(g_kvDelegatePtr->Delete(k4), OK);
2154     /**
2155      * @tc.steps: step2. device A update key and set
2156      * @tc.expected: step2. put data return ok
2157      */
__anon52917de11a02(const Key &originKey, Key &newKey) 2158     DBStatus status = g_kvDelegatePtr->UpdateKey([](const Key &originKey, Key &newKey) {
2159         newKey = originKey;
2160         newKey.push_back('0');
2161     });
2162     EXPECT_EQ(status, OK);
2163     k1.push_back('0');
2164     k2.push_back('0');
2165     k3.push_back('0');
2166     EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
2167     EXPECT_EQ(v1, actualValue);
2168     EXPECT_EQ(g_kvDelegatePtr->Get(k2, actualValue), OK);
2169     EXPECT_EQ(v2, actualValue);
2170     EXPECT_EQ(g_kvDelegatePtr->Get(k3, actualValue), OK);
2171     EXPECT_EQ(v3, actualValue);
2172 }
2173 
2174 /**
2175   * @tc.name: MetaBusy001
2176   * @tc.desc: test sync normal when update water mark busy
2177   * @tc.type: FUNC
2178   * @tc.require:
2179   * @tc.author: zhangqiquan
2180   */
2181 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, MetaBusy001, TestSize.Level1)
2182 {
2183     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2184     Key key = {'1'};
2185     Value value = {'1'};
2186     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
2187     std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2188     std::map<std::string, DBStatus> result;
2189     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2190     ASSERT_EQ(result.size(), devices.size());
2191     for (const auto &pair : result) {
2192         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2193         EXPECT_TRUE(pair.second == OK);
2194     }
2195     value = {'2'};
2196     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
__anon52917de11b02() 2197     g_deviceB->SetSaveDataCallback([] () {
2198         RuntimeContext::GetInstance()->ScheduleTask([]() {
2199             g_deviceB->EraseWaterMark("real_device");
2200         });
2201         std::this_thread::sleep_for(std::chrono::seconds(1));
2202     });
2203     EXPECT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2204     EXPECT_EQ(result.size(), devices.size());
2205     for (const auto &pair : result) {
2206         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2207         EXPECT_TRUE(pair.second == OK);
2208     }
2209     g_deviceB->SetSaveDataCallback(nullptr);
2210     RuntimeContext::GetInstance()->StopTaskPool();
2211 }
2212 
2213 /**
2214  * @tc.name: TestErrCodePassthrough001
2215  * @tc.desc: Test ErrCode Passthrough when sync comm fail
2216  * @tc.type: FUNC
2217  * @tc.require:
2218  * @tc.author: suyue
2219  */
2220 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, TestErrCodePassthrough001, TestSize.Level1)
2221 {
2222     /**
2223      * @tc.steps: step1. device put data.
2224      * @tc.expected: step1. sync return OK.
2225      */
2226     std::vector<std::string> devices;
2227     devices.push_back(g_deviceB->GetDeviceId());
2228     devices.push_back(g_deviceC->GetDeviceId());
2229     Key key1 = {'1'};
2230     Value value1 = {'1'};
2231     ASSERT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
2232 
2233     /**
2234      * @tc.steps: step2. call sync and mock commErrCode is E_BASE(positive number).
2235      * @tc.expected: step2. return COMM_FAILURE.
2236      */
2237     g_communicatorAggregator->MockCommErrCode(E_BASE);
2238     std::map<std::string, DBStatus> result;
2239     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2240     ASSERT_EQ(status, OK);
2241     for (const auto &pair : result) {
2242         LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, E_BASE);
2243         EXPECT_EQ(pair.second, COMM_FAILURE);
2244     }
2245 
2246     /**
2247      * @tc.steps: step3. call sync and mock commErrCode is -E_BASE(negative number).
2248      * @tc.expected: step3. return -E_BASE.
2249      */
2250     g_communicatorAggregator->MockCommErrCode(-E_BASE);
2251     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2252     ASSERT_EQ(status, OK);
2253     for (const auto &pair : result) {
2254         LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, COMM_FAILURE);
2255         EXPECT_EQ(pair.second, static_cast<DBStatus>(-E_BASE));
2256     }
2257 
2258     /**
2259      * @tc.steps: step4. call sync and mock commErrCode is INT_MAX.
2260      * @tc.expected: step4. return COMM_FAILURE.
2261      */
2262     g_communicatorAggregator->MockCommErrCode(INT_MAX);
2263     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2264     ASSERT_EQ(status, OK);
2265     for (const auto &pair : result) {
2266         LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, INT_MAX);
2267         EXPECT_EQ(pair.second, COMM_FAILURE);
2268     }
2269 
2270     /**
2271      * @tc.steps: step5. call sync and mock commErrCode is -INT_MAX.
2272      * @tc.expected: step5. return -INT_MAX.
2273      */
2274     g_communicatorAggregator->MockCommErrCode(-INT_MAX);
2275     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2276     ASSERT_EQ(status, OK);
2277     for (const auto &pair : result) {
2278         LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, -INT_MAX);
2279         EXPECT_EQ(pair.second, -INT_MAX);
2280     }
2281     g_communicatorAggregator->MockCommErrCode(E_OK);
2282 }
2283 
2284 /**
2285   * @tc.name: TestErrCodePassthrough002
2286   * @tc.desc: Test ErrCode Passthrough when sync time out and isDirectEnd is false
2287   * @tc.type: FUNC
2288   * @tc.require:
2289   * @tc.author: suyue
2290   */
2291 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, TestErrCodePassthrough002, TestSize.Level3)
2292 {
2293     /**
2294      * @tc.steps: step1. device put data.
2295      * @tc.expected: step1. sync return OK.
2296      */
2297     std::vector<std::string> devices;
2298     devices.push_back(g_deviceB->GetDeviceId());
2299     ASSERT_EQ(g_kvDelegatePtr->Put({'k', '1'}, {'v', '1'}), OK);
2300 
2301     /**
2302      * @tc.steps: step2. set messageId invalid and isDirectEnd is false
2303      * @tc.expected: step2. make sure deviceA push data failed due to timeout
2304      */
__anon52917de11d02(const std::string &target, DistributedDB::Message *msg) 2305     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
2306         ASSERT_NE(msg, nullptr);
2307         if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
2308             msg->SetMessageId(INVALID_MESSAGE_ID);
2309         }
2310     });
2311     g_communicatorAggregator->MockDirectEndFlag(false);
2312 
2313     /**
2314      * @tc.steps: step3. call sync and mock errCode is E_BASE(positive number).
2315      * @tc.expected: step3. return TIME_OUT.
2316      */
2317     std::map<std::string, DBStatus> result;
__anon52917de11e02(const std::map<std::string, DBStatus> &map) 2318     auto callback = [&result](const std::map<std::string, DBStatus> &map) {
2319         result = map;
2320     };
2321     Query query = Query::Select().PrefixKey({'k', '1'});
2322     g_communicatorAggregator->MockCommErrCode(E_BASE);
2323     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2324     EXPECT_EQ(result.size(), devices.size());
2325     EXPECT_EQ(result[DEVICE_B], TIME_OUT);
2326 
2327     /**
2328      * @tc.steps: step4. call sync and mock errCode is -E_BASE(negative number).
2329      * @tc.expected: step4. return -E_BASE.
2330      */
2331     g_communicatorAggregator->MockCommErrCode(-E_BASE);
2332     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2333     EXPECT_EQ(result.size(), devices.size());
2334     EXPECT_EQ(result[DEVICE_B], -E_BASE);
2335 
2336     /**
2337      * @tc.steps: step5. call sync and mock errCode is E_OK(0).
2338      * @tc.expected: step5. return TIME_OUT.
2339      */
2340     g_communicatorAggregator->MockCommErrCode(E_OK);
2341     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2342     EXPECT_EQ(result.size(), devices.size());
2343     EXPECT_EQ(result[DEVICE_B], TIME_OUT);
2344 
2345     /**
2346      * @tc.steps: step6. call sync and mock errCode is -INT_MAX.
2347      * @tc.expected: step6. return -INT_MAX.
2348      */
2349     g_communicatorAggregator->MockCommErrCode(-INT_MAX);
2350     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2351     EXPECT_EQ(result.size(), devices.size());
2352     EXPECT_EQ(result[DEVICE_B], -INT_MAX);
2353 
2354     g_communicatorAggregator->RegOnDispatch(nullptr);
2355     g_communicatorAggregator->MockCommErrCode(E_OK);
2356     g_communicatorAggregator->MockDirectEndFlag(true);
2357 }
2358