• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <condition_variable>
17 #include <gtest/gtest.h>
18 #include <thread>
19 
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "mock_sync_task_context.h"
27 #include "platform_specific.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_kv_sync_task_context.h"
30 
31 using namespace testing::ext;
32 using namespace DistributedDB;
33 using namespace DistributedDBUnitTest;
34 using namespace std;
35 
36 namespace {
37     class TestSingleVerKvSyncTaskContext : public SingleVerKvSyncTaskContext {
38     public:
39         TestSingleVerKvSyncTaskContext() = default;
40     };
41     string g_testDir;
42     const string STORE_ID = "kv_stroe_complex_sync_test";
43     const int WAIT_TIME = 1000;
44     const std::string DEVICE_A = "real_device";
45     const std::string DEVICE_B = "deviceB";
46     const std::string DEVICE_C = "deviceC";
47     const std::string CREATE_SYNC_TABLE_SQL =
48     "CREATE TABLE IF NOT EXISTS sync_data(" \
49         "key         BLOB NOT NULL," \
50         "value       BLOB," \
51         "timestamp   INT  NOT NULL," \
52         "flag        INT  NOT NULL," \
53         "device      BLOB," \
54         "ori_device  BLOB," \
55         "hash_key    BLOB PRIMARY KEY NOT NULL," \
56         "w_timestamp INT);";
57 
58     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
59     KvStoreConfig g_config;
60     DistributedDBToolsUnitTest g_tool;
61     DBStatus g_kvDelegateStatus = INVALID_ARGS;
62     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
63     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
64     KvVirtualDevice *g_deviceB = nullptr;
65     KvVirtualDevice *g_deviceC = nullptr;
66 
67     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
68     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
69         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
70 
PullSyncTest()71     void PullSyncTest()
72     {
73         DBStatus status = OK;
74         std::vector<std::string> devices;
75         devices.push_back(g_deviceB->GetDeviceId());
76 
77         Key key = {'1'};
78         Key key2 = {'2'};
79         Value value = {'1'};
80         g_deviceB->PutData(key, value, 0, 0);
81         g_deviceB->PutData(key2, value, 1, 0);
82 
83         std::map<std::string, DBStatus> result;
84         status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
85         ASSERT_TRUE(status == OK);
86 
87         ASSERT_TRUE(result.size() == devices.size());
88         for (const auto &pair : result) {
89             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
90             EXPECT_TRUE(pair.second == OK);
91         }
92         Value value3;
93         EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
94         EXPECT_EQ(value3, value);
95         EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
96         EXPECT_EQ(value3, value);
97     }
98 
CrudTest()99     void CrudTest()
100     {
101         vector<Entry> entries;
102         int totalSize = 10;
103         for (int i = 0; i < totalSize; i++) {
104             Entry entry;
105             entry.key.push_back(i);
106             entry.value.push_back('2');
107             entries.push_back(entry);
108         }
109         EXPECT_TRUE(g_kvDelegatePtr->PutBatch(entries) == OK);
110         for (const auto &entry : entries) {
111             Value resultvalue;
112             EXPECT_TRUE(g_kvDelegatePtr->Get(entry.key, resultvalue) == OK);
113             EXPECT_TRUE(resultvalue == entry.value);
114         }
115         for (int i = 0; i < totalSize / 2; i++) { // 2: Half of the total
116             g_kvDelegatePtr->Delete(entries[i].key);
117             Value resultvalue;
118             EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == NOT_FOUND);
119         }
120         for (int i = totalSize / 2; i < totalSize; i++) {
121             Value value = entries[i].value;
122             value.push_back('x');
123             EXPECT_TRUE(g_kvDelegatePtr->Put(entries[i].key, value) == OK);
124             Value resultvalue;
125             EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == OK);
126             EXPECT_TRUE(resultvalue == value);
127         }
128     }
129 
DataSync005()130     void DataSync005()
131     {
132         ASSERT_NE(g_communicatorAggregator, nullptr);
133         SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
134         ASSERT_TRUE(dataSync != nullptr);
135         dataSync->SendSaveDataNotifyPacket(nullptr, 0, 0, 0, TIME_SYNC_MESSAGE);
136         EXPECT_EQ(g_communicatorAggregator->GetOnlineDevices().size(), 3u); // 3 online dev
137         delete dataSync;
138     }
139 
DataSync008()140     void DataSync008()
141     {
142         SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
143         ASSERT_TRUE(dataSync != nullptr);
144         auto context = new (std::nothrow) MockSyncTaskContext();
145         dataSync->PutDataMsg(nullptr);
146         bool isNeedHandle = false;
147         bool isContinue = false;
148         EXPECT_EQ(dataSync->MoveNextDataMsg(context, isNeedHandle, isContinue), nullptr);
149         EXPECT_EQ(isNeedHandle, false);
150         EXPECT_EQ(isContinue, false);
151         delete dataSync;
152         delete context;
153     }
154 
ReSetWaterDogTest001()155     void ReSetWaterDogTest001()
156     {
157         /**
158          * @tc.steps: step1. put 10 key/value
159          * @tc.expected: step1, put return OK.
160          */
161         for (int i = 0; i < 5; i++) { // put 5 key
162             Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 1024); // rand num 1024 for test
163             Value value;
164             DistributedDBToolsUnitTest::GetRandomKeyValue(value, 10 * 50 * 1024u); // 10 * 50 * 1024 = 500k
165             EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
166         }
167         /**
168          * @tc.steps: step2. SetDeviceMtuSize
169          * @tc.expected: step2, return OK.
170          */
171         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 50 * 1024u); // 50 * 1024u = 50k
172         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 50 * 1024u); // 50 * 1024u = 50k
173         /**
174          * @tc.steps: step3. deviceA,deviceB sync to each other at same time
175          * @tc.expected: step3. sync should return OK.
176          */
177         EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true), E_OK);
178         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
179         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
180     }
181 }
182 
183 class DistributedDBSingleVerP2PComplexSyncTest : public testing::Test {
184 public:
185     static void SetUpTestCase(void);
186     static void TearDownTestCase(void);
187     void SetUp();
188     void TearDown();
189 };
190 
SetUpTestCase(void)191 void DistributedDBSingleVerP2PComplexSyncTest::SetUpTestCase(void)
192 {
193     /**
194      * @tc.setup: Init datadir and Virtual Communicator.
195      */
196     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
197     g_config.dataDir = g_testDir;
198     g_mgr.SetKvStoreConfig(g_config);
199 
200     string dir = g_testDir + "/single_ver";
201     DIR* dirTmp = opendir(dir.c_str());
202     if (dirTmp == nullptr) {
203         OS::MakeDBDirectory(dir);
204     } else {
205         closedir(dirTmp);
206     }
207 
208     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
209     ASSERT_TRUE(g_communicatorAggregator != nullptr);
210     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
211 }
212 
TearDownTestCase(void)213 void DistributedDBSingleVerP2PComplexSyncTest::TearDownTestCase(void)
214 {
215     /**
216      * @tc.teardown: Release virtual Communicator and clear data dir.
217      */
218     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
219         LOGE("rm test db files error!");
220     }
221     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
222 }
223 
SetUp(void)224 void DistributedDBSingleVerP2PComplexSyncTest::SetUp(void)
225 {
226     DistributedDBToolsUnitTest::PrintTestCaseInfo();
227     /**
228      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
229      */
230     KvStoreNbDelegate::Option option;
231     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
232     ASSERT_TRUE(g_kvDelegateStatus == OK);
233     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
234     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
235     ASSERT_TRUE(g_deviceB != nullptr);
236     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
237     ASSERT_TRUE(syncInterfaceB != nullptr);
238     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
239 
240     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
241     ASSERT_TRUE(g_deviceC != nullptr);
242     VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
243     ASSERT_TRUE(syncInterfaceC != nullptr);
244     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
245 
246     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
247         const std::string &deviceId, uint8_t flag) -> bool {
248             return true;
249         };
250     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
251 }
252 
TearDown(void)253 void DistributedDBSingleVerP2PComplexSyncTest::TearDown(void)
254 {
255     /**
256      * @tc.teardown: Release device A, B, C
257      */
258     if (g_kvDelegatePtr != nullptr) {
259         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
260         g_kvDelegatePtr = nullptr;
261         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
262         LOGD("delete kv store status %d", status);
263         ASSERT_TRUE(status == OK);
264     }
265     if (g_deviceB != nullptr) {
266         delete g_deviceB;
267         g_deviceB = nullptr;
268     }
269     if (g_deviceC != nullptr) {
270         delete g_deviceC;
271         g_deviceC = nullptr;
272     }
273     PermissionCheckCallbackV2 nullCallback;
274     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
275 }
276 
277 /**
278   * @tc.name: SaveDataNotify001
279   * @tc.desc: Test SaveDataNotify function, delay < 30s should sync ok, > 36 should timeout
280   * @tc.type: FUNC
281   * @tc.require: AR000D4876
282   * @tc.author: xushaohua
283   */
284 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SaveDataNotify001, TestSize.Level3)
285 {
286     DBStatus status = OK;
287     const int waitFiveSeconds = 5000;
288     const int waitThirtySeconds = 30000;
289     const int waitThirtySixSeconds = 36000;
290     std::vector<std::string> devices;
291     devices.push_back(g_deviceB->GetDeviceId());
292 
293     /**
294      * @tc.steps: step1. deviceA put {k1, v1}
295      */
296     Key key = {'1'};
297     Value value = {'1'};
298     status = g_kvDelegatePtr->Put(key, value);
299     ASSERT_TRUE(status == OK);
300 
301     /**
302      * @tc.steps: step2. deviceB set sava data dely 5s
303      */
304     g_deviceB->SetSaveDataDelayTime(waitFiveSeconds);
305 
306     /**
307      * @tc.steps: step3. deviceA call sync and wait
308      * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
309      */
310     std::map<std::string, DBStatus> result;
311     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
312     ASSERT_TRUE(status == OK);
313     ASSERT_TRUE(result.size() == devices.size());
314     ASSERT_TRUE(result[DEVICE_B] == OK);
315 
316     /**
317      * @tc.steps: step4. deviceB set sava data dely 30s and put {k1, v1}
318      */
319     g_deviceB->SetSaveDataDelayTime(waitThirtySeconds);
320     status = g_kvDelegatePtr->Put(key, value);
321     ASSERT_TRUE(status == OK);
322      /**
323      * @tc.steps: step3. deviceA call sync and wait
324      * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
325      */
326     result.clear();
327     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
328     ASSERT_TRUE(status == OK);
329     ASSERT_TRUE(result.size() == devices.size());
330     ASSERT_TRUE(result[DEVICE_B] == OK);
331 
332     /**
333      * @tc.steps: step4. deviceB set sava data dely 36s and put {k1, v1}
334      */
335     g_deviceB->SetSaveDataDelayTime(waitThirtySixSeconds);
336     status = g_kvDelegatePtr->Put(key, value);
337     ASSERT_TRUE(status == OK);
338     /**
339      * @tc.steps: step5. deviceA call sync and wait
340      * @tc.expected: step5. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
341      */
342     result.clear();
343     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
344     ASSERT_TRUE(status == OK);
345     ASSERT_TRUE(result.size() == devices.size());
346     ASSERT_TRUE(result[DEVICE_B] == TIME_OUT);
347 }
348 
349 /**
350   * @tc.name: SametimeSync001
351   * @tc.desc: Test 2 device sync with each other
352   * @tc.type: FUNC
353   * @tc.require: AR000CCPOM
354   * @tc.author: zhangqiquan
355   */
356 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync001, TestSize.Level3)
357 {
358     DBStatus status = OK;
359     std::vector<std::string> devices;
360     devices.push_back(g_deviceB->GetDeviceId());
361 
362     int responseCount = 0;
363     int requestCount = 0;
364     Key key = {'1'};
365     Value value = {'1'};
366     /**
367      * @tc.steps: step1. make sure deviceB send pull firstly and response_pull secondly
368      * @tc.expected: step1. deviceA put data when finish push task. put data should return OK.
369      */
370     g_communicatorAggregator->RegOnDispatch([&responseCount, &requestCount, &key, &value](
__anon7f2dc5a00302( const std::string &target, DistributedDB::Message *msg) 371         const std::string &target, DistributedDB::Message *msg) {
372         if (target != "real_device" || msg->GetMessageId() != DATA_SYNC_MESSAGE) {
373             return;
374         }
375 
376         if (msg->GetMessageType() == TYPE_RESPONSE) {
377             responseCount++;
378             if (responseCount == 1) { // 1 is the ack which B response A's push task
379                 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), DBStatus::OK);
380                 std::this_thread::sleep_for(std::chrono::seconds(1));
381             } else if (responseCount == 2) { // 2 is the ack which B response A's response_pull task
382                 msg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
383             }
384         } else if (msg->GetMessageType() == TYPE_REQUEST) {
385             requestCount++;
386             if (requestCount == 1) { // 1 is A push task
387                 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2 sec
388             }
389         }
390     });
391     /**
392      * @tc.steps: step2. deviceA,deviceB sync to each other at same time
393      * @tc.expected: step2. sync should return OK.
394      */
395     std::map<std::string, DBStatus> result;
__anon7f2dc5a00402null396     std::thread subThread([]{
397         g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true);
398     });
399     status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_PULL, result);
400     subThread.join();
401     g_communicatorAggregator->RegOnDispatch(nullptr);
402 
403     EXPECT_TRUE(status == OK);
404     ASSERT_TRUE(result.size() == devices.size());
405     EXPECT_TRUE(result[DEVICE_B] == OK);
406     Value actualValue;
407     g_kvDelegatePtr->Get(key, actualValue);
408     EXPECT_EQ(actualValue, value);
409 }
410 
411 /**
412   * @tc.name: SametimeSync002
413   * @tc.desc: Test 2 device sync with each other with water error
414   * @tc.type: FUNC
415   * @tc.require: AR000CCPOM
416   * @tc.author: zhangqiquan
417   */
418 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync002, TestSize.Level3)
419 {
420     DBStatus status = OK;
421     std::vector<std::string> devices;
422     devices.push_back(g_deviceB->GetDeviceId());
423     g_kvDelegatePtr->Put({'k', '1'}, {'v', '1'});
424     /**
425      * @tc.steps: step1. make sure deviceA push data failed and increase water mark
426      * @tc.expected: step1. deviceA push failed with timeout
427      */
__anon7f2dc5a00502(const std::string &target, DistributedDB::Message *msg) 428     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
429         ASSERT_NE(msg, nullptr);
430         if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
431             msg->SetMessageId(INVALID_MESSAGE_ID);
432         }
433     });
434     std::map<std::string, DBStatus> result;
__anon7f2dc5a00602(const std::map<std::string, DBStatus> &map) 435     auto callback = [&result](const std::map<std::string, DBStatus> &map) {
436         result = map;
437     };
438     Query query = Query::Select().PrefixKey({'k', '1'});
439     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
440     ASSERT_TRUE(result.size() == devices.size());
441     EXPECT_TRUE(result[DEVICE_B] == TIME_OUT);
442     /**
443      * @tc.steps: step2. A push to B with query2, sleep 1s for waiting step3
444      * @tc.expected: step2. sync should return OK.
445      */
__anon7f2dc5a00702(const std::string &target, DistributedDB::Message *msg) 446     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
447         ASSERT_NE(msg, nullptr);
448         if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
449             std::this_thread::sleep_for(std::chrono::seconds(1));
450         }
451     });
__anon7f2dc5a00802null452     std::thread subThread([&devices] {
453         std::map<std::string, DBStatus> result;
454         auto callback = [&result](const std::map<std::string, DBStatus> &map) {
455             result = map;
456         };
457         Query query = Query::Select().PrefixKey({'k', '2'});
458         LOGD("Begin PUSH");
459         EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
460         ASSERT_TRUE(result.size() == devices.size());
461         EXPECT_TRUE(result[DEVICE_A] == OK);
462     });
463     /**
464      * @tc.steps: step3. B pull to A when A is in push task
465      * @tc.expected: step3. sync should return OP_FINISHED_ALL.
466      */
467     std::this_thread::sleep_for(std::chrono::milliseconds(100));
468     std::map<std::string, int> virtualResult;
469     g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, query,
__anon7f2dc5a00a02(const std::map<std::string, int> &map) 470         [&virtualResult](const std::map<std::string, int> &map) {
471             virtualResult = map;
472         }, true);
473     EXPECT_TRUE(status == OK);
474     ASSERT_EQ(virtualResult.size(), devices.size());
475     EXPECT_EQ(virtualResult[DEVICE_A], SyncOperation::OP_FINISHED_ALL);
476     g_communicatorAggregator->RegOnDispatch(nullptr);
477     subThread.join();
478 }
479 
480 /**
481  * @tc.name: DatabaseOnlineCallback001
482  * @tc.desc: check database status notify online callback
483  * @tc.type: FUNC
484  * @tc.require: AR000CQS3S SR000CQE0B
485  * @tc.author: zhuwentao
486  */
487 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOnlineCallback001, TestSize.Level1)
488 {
489     /**
490      * @tc.steps: step1. SetStoreStatusNotifier
491      * @tc.expected: step1. SetStoreStatusNotifier ok
492      */
493     std::string targetDev = "DEVICE_X";
494     bool isCheckOk = false;
495     auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anon7f2dc5a00b02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 496         const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
497         if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
498             onlineStatus == true) {
499             isCheckOk = true;
500         }};
501     g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
502     /**
503      * @tc.steps: step2. trigger device online
504      * @tc.expected: step2. check callback ok
505      */
506     g_communicatorAggregator->OnlineDevice(targetDev);
507     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
508     EXPECT_EQ(isCheckOk, true);
509     StoreStatusNotifier nullCallback;
510     g_mgr.SetStoreStatusNotifier(nullCallback);
511 }
512 
513 /**
514  * @tc.name: DatabaseOfflineCallback001
515  * @tc.desc: check database status notify online callback
516  * @tc.type: FUNC
517  * @tc.require: AR000CQS3S SR000CQE0B
518  * @tc.author: zhuwentao
519  */
520 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOfflineCallback001, TestSize.Level1)
521 {
522     /**
523      * @tc.steps: step1. SetStoreStatusNotifier
524      * @tc.expected: step1. SetStoreStatusNotifier ok
525      */
526     std::string targetDev = "DEVICE_X";
527     bool isCheckOk = false;
528     auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anon7f2dc5a00c02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 529         const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
530         if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
531             onlineStatus == false) {
532             isCheckOk = true;
533         }};
534     g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
535     /**
536      * @tc.steps: step2. trigger device offline
537      * @tc.expected: step2. check callback ok
538      */
539     g_communicatorAggregator->OfflineDevice(targetDev);
540     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
541     EXPECT_EQ(isCheckOk, true);
542     StoreStatusNotifier nullCallback;
543     g_mgr.SetStoreStatusNotifier(nullCallback);
544 }
545 
546 /**
547   * @tc.name: CloseSync001
548   * @tc.desc: Test 2 delegate close when sync
549   * @tc.type: FUNC
550   * @tc.require: AR000CCPOM
551   * @tc.author: zhangqiquan
552   */
553 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync001, TestSize.Level3)
554 {
555     DBStatus status = OK;
556     std::vector<std::string> devices;
557     devices.push_back(g_deviceB->GetDeviceId());
558 
559     /**
560      * @tc.steps: step1. make sure A sync start
561      */
562     bool sleep = false;
__anon7f2dc5a00d02(const std::string &target, DistributedDB::Message *msg) 563     g_communicatorAggregator->RegOnDispatch([&sleep](const std::string &target, DistributedDB::Message *msg) {
564         if (!sleep) {
565             sleep = true;
566             std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s for waiting close db
567         }
568     });
569 
570     KvStoreNbDelegate* kvDelegatePtrA = nullptr;
571     KvStoreNbDelegate::Option option;
__anon7f2dc5a00e02(DBStatus s, KvStoreNbDelegate *delegate) 572     g_mgr.GetKvStore(STORE_ID, option, [&status, &kvDelegatePtrA](DBStatus s, KvStoreNbDelegate *delegate) {
573         status = s;
574         kvDelegatePtrA = delegate;
575     });
576     EXPECT_EQ(status, OK);
577     EXPECT_NE(kvDelegatePtrA, nullptr);
578 
579     Key key = {'k'};
580     Value value = {'v'};
581     kvDelegatePtrA->Put(key, value);
582     std::map<std::string, DBStatus> result;
__anon7f2dc5a00f02(const std::map<std::string, DBStatus>& statusMap) 583     auto callback = [&result](const std::map<std::string, DBStatus>& statusMap) {
584         result = statusMap;
585     };
586     /**
587      * @tc.steps: step2. deviceA sync and then close
588      * @tc.expected: step2. sync should abort and data don't exist in B
589      */
__anon7f2dc5a01002() 590     std::thread closeThread([&kvDelegatePtrA]() {
591         std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for waiting sync start
592         EXPECT_EQ(g_mgr.CloseKvStore(kvDelegatePtrA), OK);
593     });
594     EXPECT_EQ(kvDelegatePtrA->Sync(devices, SYNC_MODE_PUSH_ONLY, callback, false), OK);
595     LOGD("Sync finish");
596     closeThread.join();
597     std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s for waiting sync finish
598     EXPECT_EQ(result.size(), 0u);
599     VirtualDataItem actualValue;
600     EXPECT_EQ(g_deviceB->GetData(key, actualValue), -E_NOT_FOUND);
601     g_communicatorAggregator->RegOnDispatch(nullptr);
602 }
603 
604 /**
605   * @tc.name: CloseSync002
606   * @tc.desc: Test 1 delegate close when in time sync
607   * @tc.type: FUNC
608   * @tc.require: AR000CCPOM
609   * @tc.author: zhangqiquan
610   */
611 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync002, TestSize.Level3)
612 {
613     /**
614      * @tc.steps: step1. invalid time sync packet from A
615      */
__anon7f2dc5a01102(const std::string &target, DistributedDB::Message *msg) 616     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
617         ASSERT_NE(msg, nullptr);
618         if (target == DEVICE_B && msg->GetMessageId() == TIME_SYNC_MESSAGE && msg->GetMessageType() == TYPE_REQUEST) {
619             msg->SetMessageId(INVALID_MESSAGE_ID);
620             LOGD("Message is invalid");
621         }
622     });
623     Timestamp currentTime;
624     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
625     g_deviceB->PutData({'k'}, {'v'}, currentTime, 0);
626 
627     /**
628      * @tc.steps: step2. B PUSH to A and A close after 1s
629      * @tc.expected: step2. A closing time cost letter than 4s
630      */
__anon7f2dc5a01202() 631     std::thread closingThread([]() {
632         std::this_thread::sleep_for(std::chrono::seconds(1));
633         LOGD("Begin Close");
634         Timestamp beginTime;
635         (void)OS::GetCurrentSysTimeInMicrosecond(beginTime);
636         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
637         Timestamp endTime;
638         (void)OS::GetCurrentSysTimeInMicrosecond(endTime);
639         EXPECT_LE(static_cast<int>(endTime - beginTime), 4 * 1000 * 1000); // waiting 4 * 1000 * 1000 us
640         LOGD("End Close");
641     });
642     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
643     closingThread.join();
644 
645     /**
646      * @tc.steps: step3. remove db
647      * @tc.expected: step3. remove ok
648      */
649     g_kvDelegatePtr = nullptr;
650     DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
651     LOGD("delete kv store status %d", status);
652     ASSERT_TRUE(status == OK);
653     g_communicatorAggregator->RegOnDispatch(nullptr);
654 }
655 
656 /**
657   * @tc.name: OrderbyWriteTimeSync001
658   * @tc.desc: sync query with order by writeTime
659   * @tc.type: FUNC
660   * @tc.require: AR000H5VLO
661   * @tc.author: zhuwentao
662   */
663 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, OrderbyWriteTimeSync001, TestSize.Level0)
664 {
665     /**
666      * @tc.steps: step1. deviceA subscribe query with order by write time
667      * * @tc.expected: step1. interface return not support
668     */
669     std::vector<std::string> devices;
670     devices.push_back(g_deviceB->GetDeviceId());
671     Query query = Query::Select().PrefixKey({'k'}).OrderByWriteTime(true);
672     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, nullptr, query, true), NOT_SUPPORT);
673 }
674 
675 
676 /**
677  * @tc.name: Device Offline Sync 001
678  * @tc.desc: Test push sync when device offline
679  * @tc.type: FUNC
680  * @tc.require: AR000CCPOM
681  * @tc.author: xushaohua
682  */
683 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync001, TestSize.Level1)
684 {
685     std::vector<std::string> devices;
686     devices.push_back(g_deviceB->GetDeviceId());
687     devices.push_back(g_deviceC->GetDeviceId());
688 
689     /**
690      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2}, {k3 delete}, {k4,v2}
691      */
692     Key key1 = {'1'};
693     Value value1 = {'1'};
694     ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value1) == OK);
695 
696     Key key2 = {'2'};
697     Value value2 = {'2'};
698     ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value2) == OK);
699 
700     Key key3 = {'3'};
701     Value value3 = {'3'};
702     ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value3) == OK);
703     ASSERT_TRUE(g_kvDelegatePtr->Delete(key3) == OK);
704 
705     Key key4 = {'4'};
706     Value value4 = {'4'};
707     ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value4) == OK);
708 
709     /**
710      * @tc.steps: step2. deviceB offline
711      */
712     g_deviceB->Offline();
713 
714     /**
715      * @tc.steps: step3. deviceA call pull sync
716      * @tc.expected: step3. sync should return OK.
717      */
718     std::map<std::string, DBStatus> result;
719     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
720     ASSERT_TRUE(status == OK);
721 
722     /**
723      * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
724      *     deviceC has {k1, v1}, {k2, v2}, {k3 delete}, {k4,v4}
725      */
726     for (const auto &pair : result) {
727         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
728         if (pair.first == DEVICE_B) {
729             EXPECT_TRUE(pair.second == COMM_FAILURE);
730         } else {
731             EXPECT_TRUE(pair.second == OK);
732         }
733     }
734     VirtualDataItem item;
735     g_deviceC->GetData(key1, item);
736     EXPECT_TRUE(item.value == value1);
737     item.value.clear();
738     g_deviceC->GetData(key2, item);
739     EXPECT_TRUE(item.value == value2);
740     item.value.clear();
741     Key hashKey;
742     DistributedDBToolsUnitTest::CalcHash(key3, hashKey);
743     EXPECT_TRUE(g_deviceC->GetData(hashKey, item) == -E_NOT_FOUND);
744     item.value.clear();
745     g_deviceC->GetData(key4, item);
746     EXPECT_TRUE(item.value == value4);
747 }
748 
749 /**
750  * @tc.name: Device Offline Sync 002
751  * @tc.desc: Test pull sync when device offline
752  * @tc.type: FUNC
753  * @tc.require: AR000CCPOM
754  * @tc.author: xushaohua
755  */
756 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync002, TestSize.Level1)
757 {
758     std::vector<std::string> devices;
759     devices.push_back(g_deviceB->GetDeviceId());
760     devices.push_back(g_deviceC->GetDeviceId());
761 
762     /**
763      * @tc.steps: step1. deviceB put {k1, v1}
764      */
765     Key key1 = {'1'};
766     Value value1 = {'1'};
767     g_deviceB->PutData(key1, value1, 0, 0);
768 
769     /**
770      * @tc.steps: step2. deviceB offline
771      */
772     g_deviceB->Offline();
773 
774     /**
775      * @tc.steps: step3. deviceC put {k2, v2}, {k3, delete}, {k4, v4}
776      */
777     Key key2 = {'2'};
778     Value value2 = {'2'};
779     g_deviceC->PutData(key2, value2, 0, 0);
780 
781     Key key3 = {'3'};
782     Value value3 = {'3'};
783     g_deviceC->PutData(key3, value3, 0, 1);
784 
785     Key key4 = {'4'};
786     Value value4 = {'4'};
787     g_deviceC->PutData(key4, value4, 0, 0);
788 
789     /**
790      * @tc.steps: step2. deviceA call pull sync
791      * @tc.expected: step2. sync should return OK.
792      */
793     std::map<std::string, DBStatus> result;
794     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
795     ASSERT_TRUE(status == OK);
796 
797     /**
798      * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
799      *     deviceA has {k2, v2}, {k3 delete}, {k4,v4}
800      */
801     for (const auto &pair : result) {
802         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
803         if (pair.first == DEVICE_B) {
804             EXPECT_TRUE(pair.second == COMM_FAILURE);
805         } else {
806             EXPECT_TRUE(pair.second == OK);
807         }
808     }
809 
810     Value value5;
811     EXPECT_TRUE(g_kvDelegatePtr->Get(key1, value5) != OK);
812     g_kvDelegatePtr->Get(key2, value5);
813     EXPECT_EQ(value5, value2);
814     EXPECT_TRUE(g_kvDelegatePtr->Get(key3, value5) != OK);
815     g_kvDelegatePtr->Get(key4, value5);
816     EXPECT_EQ(value5, value4);
817 }
818 
819 /**
820   * @tc.name: EncryptedAlgoUpgrade001
821   * @tc.desc: Test upgrade encrypted db can sync normally
822   * @tc.type: FUNC
823   * @tc.require: AR000HI2JS
824   * @tc.author: zhuwentao
825   */
826 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, EncryptedAlgoUpgrade001, TestSize.Level3)
827 {
828     /**
829      * @tc.steps: step1. clear db
830      * * @tc.expected: step1. interface return ok
831     */
832     if (g_kvDelegatePtr != nullptr) {
833         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
834         g_kvDelegatePtr = nullptr;
835         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
836         LOGD("delete kv store status %d", status);
837         ASSERT_TRUE(status == OK);
838     }
839 
840     CipherPassword passwd;
841     std::vector<uint8_t> passwdVect = {'p', 's', 'd', '1'};
842     passwd.SetValue(passwdVect.data(), passwdVect.size());
843     /**
844      * @tc.steps: step2. open old db by sql
845      * * @tc.expected: step2. interface return ok
846     */
847     std::string identifier = DBCommon::GenerateIdentifierId(STORE_ID, APP_ID, USER_ID);
848     std::string hashDir = DBCommon::TransferHashString(identifier);
849     std::string hexHashDir = DBCommon::TransferStringToHex(hashDir);
850     std::string dbPath = g_testDir + "/" + hexHashDir + "/single_ver";
851     ASSERT_TRUE(DBCommon::CreateDirectory(g_testDir + "/" + hexHashDir) == OK);
852     ASSERT_TRUE(DBCommon::CreateDirectory(dbPath) == OK);
853     std::vector<std::string> dbDir {DBConstant::MAINDB_DIR, DBConstant::METADB_DIR, DBConstant::CACHEDB_DIR};
854     for (const auto &item : dbDir) {
855         ASSERT_TRUE(DBCommon::CreateDirectory(dbPath + "/" + item) == OK);
856     }
857     uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
858     sqlite3 *db;
859     std::string fileUrl = dbPath + "/" + DBConstant::MAINDB_DIR + "/" + DBConstant::SINGLE_VER_DATA_STORE + ".db";
860     ASSERT_TRUE(sqlite3_open_v2(fileUrl.c_str(), &db, flag, nullptr) == SQLITE_OK);
861     SQLiteUtils::SetKeyInner(db, CipherType::AES_256_GCM, passwd, DBConstant::DEFAULT_ITER_TIMES);
862     /**
863      * @tc.steps: step3. create table and close
864      * * @tc.expected: step3. interface return ok
865     */
866     ASSERT_TRUE(SQLiteUtils::ExecuteRawSQL(db, CREATE_SYNC_TABLE_SQL) == E_OK);
867     sqlite3_close_v2(db);
868     db = nullptr;
869     LOGI("create old db success");
870     /**
871      * @tc.steps: step4. get kvstore
872      * * @tc.expected: step4. interface return ok
873     */
874     KvStoreNbDelegate::Option option;
875     option.isEncryptedDb = true;
876     option.cipher = CipherType::AES_256_GCM;
877     option.passwd = passwd;
878     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
879     ASSERT_TRUE(g_kvDelegateStatus == OK);
880     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
881     /**
882      * @tc.steps: step5. sync ok
883      * * @tc.expected: step5. interface return ok
884     */
885     PullSyncTest();
886     /**
887      * @tc.steps: step5. crud ok
888      * * @tc.expected: step5. interface return ok
889     */
890     CrudTest();
891 }
892 
893 /**
894   * @tc.name: RemoveDeviceData002
895   * @tc.desc: test remove device data before sync
896   * @tc.type: FUNC
897   * @tc.require:
898   * @tc.author: zhuwentao
899   */
900 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData002, TestSize.Level1)
901 {
902     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
903     /**
904      * @tc.steps: step1. sync deviceB data to A and check data
905      * * @tc.expected: step1. interface return ok
906     */
907     Key key1 = {'1'};
908     Key key2 = {'2'};
909     Value value = {'1'};
910     Timestamp currentTime;
911     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
912     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
913     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
914     EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
915     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
916     Value actualValue;
917     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
918     EXPECT_EQ(actualValue, value);
919     actualValue.clear();
920     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
921     EXPECT_EQ(actualValue, value);
922     /**
923      * @tc.steps: step2. call RemoveDeviceData
924      * * @tc.expected: step2. interface return ok
925     */
926     g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
927     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
928     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
929     /**
930      * @tc.steps: step3. sync to device A again and check data
931      * * @tc.expected: step3. sync ok
932     */
933     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
934     actualValue.clear();
935     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
936     EXPECT_EQ(actualValue, value);
937     actualValue.clear();
938     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
939     EXPECT_EQ(actualValue, value);
940 }
941 
942 /**
943   * @tc.name: DataSync001
944   * @tc.desc: Test Data Sync when Initialize
945   * @tc.type: FUNC
946   * @tc.require: AR000HI2JS
947   * @tc.author: zhuwentao
948   */
949 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync001, TestSize.Level1)
950 {
951     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
952     ASSERT_TRUE(dataSync != nullptr);
953     std::shared_ptr<Metadata> inMetadata = nullptr;
954     std::string deviceId;
955     Message message;
956     VirtualSingleVerSyncDBInterface tmpInterface;
957     VirtualCommunicator tmpCommunicator(deviceId, g_communicatorAggregator);
958     EXPECT_EQ(dataSync->Initialize(nullptr, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
959     EXPECT_EQ(dataSync->Initialize(&tmpInterface, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
960     EXPECT_EQ(dataSync->Initialize(&tmpInterface, &tmpCommunicator, inMetadata, deviceId), -E_INVALID_ARGS);
961     delete dataSync;
962 }
963 
964 /**
965   * @tc.name: DataSync002
966   * @tc.desc: Test active sync with invalid param in DataSync Class
967   * @tc.type: FUNC
968   * @tc.require: AR000HI2JS
969   * @tc.author: zhuwentao
970   */
971 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync002, TestSize.Level1)
972 {
973     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
974     ASSERT_TRUE(dataSync != nullptr);
975     Message message;
976     EXPECT_EQ(dataSync->TryContinueSync(nullptr, &message), -E_INVALID_ARGS);
977     EXPECT_EQ(dataSync->TryContinueSync(nullptr, nullptr), -E_INVALID_ARGS);
978     EXPECT_EQ(dataSync->PushStart(nullptr), -E_INVALID_ARGS);
979     EXPECT_EQ(dataSync->PushPullStart(nullptr), -E_INVALID_ARGS);
980     EXPECT_EQ(dataSync->PullRequestStart(nullptr), -E_INVALID_ARGS);
981     EXPECT_EQ(dataSync->PullResponseStart(nullptr), -E_INVALID_ARGS);
982     delete dataSync;
983 }
984 
985 /**
986   * @tc.name: DataSync003
987   * @tc.desc: Test receive invalid request data packet in DataSync Class
988   * @tc.type: FUNC
989   * @tc.require: AR000HI2JS
990   * @tc.author: zhuwentao
991   */
992 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync003, TestSize.Level1)
993 {
994     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
995     ASSERT_TRUE(dataSync != nullptr);
996     uint64_t tmpMark = 0;
997     Message message;
998     EXPECT_EQ(dataSync->DataRequestRecv(nullptr, nullptr, tmpMark), -E_INVALID_ARGS);
999     EXPECT_EQ(dataSync->DataRequestRecv(nullptr, &message, tmpMark), -E_INVALID_ARGS);
1000     delete dataSync;
1001 }
1002 
1003 /**
1004   * @tc.name: DataSync004
1005   * @tc.desc: Test receive invalid ack packet in DataSync Class
1006   * @tc.type: FUNC
1007   * @tc.require: AR000HI2JS
1008   * @tc.author: zhuwentao
1009   */
1010 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync004, TestSize.Level1)
1011 {
1012     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1013     ASSERT_TRUE(dataSync != nullptr);
1014     Message message;
1015     TestSingleVerKvSyncTaskContext tmpContext;
1016     EXPECT_EQ(dataSync->AckPacketIdCheck(nullptr), false);
1017     EXPECT_EQ(dataSync->AckPacketIdCheck(&message), false);
1018     EXPECT_EQ(dataSync->AckRecv(&tmpContext, nullptr), -E_INVALID_ARGS);
1019     EXPECT_EQ(dataSync->AckRecv(nullptr, nullptr), -E_INVALID_ARGS);
1020     EXPECT_EQ(dataSync->AckRecv(nullptr, &message), -E_INVALID_ARGS);
1021     delete dataSync;
1022 }
1023 
1024 /**
1025   * @tc.name: DataSync005
1026   * @tc.desc: Test receive invalid notify packet in DataSync Class
1027   * @tc.type: FUNC
1028   * @tc.require: AR000HI2JS
1029   * @tc.author: zhuwentao
1030   */
1031 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync005, TestSize.Level1)
1032 {
1033     ASSERT_NO_FATAL_FAILURE(DataSync005());
1034 }
1035 
1036 /**
1037   * @tc.name: DataSync006
1038   * @tc.desc: Test control start with invalid param in DataSync Class
1039   * @tc.type: FUNC
1040   * @tc.require: AR000HI2JS
1041   * @tc.author: zhuwentao
1042   */
1043 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync006, TestSize.Level1)
1044 {
1045     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1046     ASSERT_TRUE(dataSync != nullptr);
1047     TestSingleVerKvSyncTaskContext tmpContext;
1048     EXPECT_EQ(dataSync->ControlCmdStart(nullptr), -E_INVALID_ARGS);
1049     EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1050     std::shared_ptr<SubscribeManager> subManager = std::make_shared<SubscribeManager>();
1051     tmpContext.SetSubscribeManager(subManager);
1052     tmpContext.SetMode(SyncModeType::INVALID_MODE);
1053     EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1054     std::set<Key> Keys = {{'a'}, {'b'}};
1055     Query query = Query::Select().InKeys(Keys);
1056     QuerySyncObject innerQuery(query);
1057     tmpContext.SetQuery(innerQuery);
1058     tmpContext.SetMode(SyncModeType::SUBSCRIBE_QUERY);
1059     EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_NOT_SUPPORT);
1060     delete dataSync;
1061     subManager = nullptr;
1062 }
1063 
1064 /**
1065   * @tc.name: DataSync007
1066   * @tc.desc: Test receive invalid control packet in DataSync Class
1067   * @tc.type: FUNC
1068   * @tc.require: AR000HI2JS
1069   * @tc.author: zhuwentao
1070   */
1071 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync007, TestSize.Level1)
1072 {
1073     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1074     ASSERT_TRUE(dataSync != nullptr);
1075     Message message;
1076     ControlRequestPacket packet;
1077     TestSingleVerKvSyncTaskContext tmpContext;
1078     EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1079     message.SetCopiedObject(packet);
1080     EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1081     delete dataSync;
1082 }
1083 
1084 /**
1085   * @tc.name: DataSync008
1086   * @tc.desc: Test pull null msg in dataQueue in DataSync Class
1087   * @tc.type: FUNC
1088   * @tc.require: AR000HI2JS
1089   * @tc.author: zhuwentao
1090   */
1091 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync008, TestSize.Level1)
1092 {
1093     ASSERT_NO_FATAL_FAILURE(DataSync008());
1094 }
1095 
1096 /**
1097  * @tc.name: SyncRetry001
1098  * @tc.desc: use sync retry sync use push
1099  * @tc.type: FUNC
1100  * @tc.require: AR000CKRTD AR000CQE0E
1101  * @tc.author: zhuwentao
1102  */
1103 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry001, TestSize.Level3)
1104 {
1105     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1106     std::vector<std::string> devices;
1107     devices.push_back(g_deviceB->GetDeviceId());
1108 
1109     /**
1110      * @tc.steps: step1. set sync retry
1111      * @tc.expected: step1, Pragma return OK.
1112      */
1113     int pragmaData = 1;
1114     PragmaData input = static_cast<PragmaData>(&pragmaData);
1115     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1116 
1117     /**
1118      * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1119      */
1120     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1121 
1122     /**
1123      * @tc.steps: step3. deviceA call sync and wait
1124      * @tc.expected: step3. sync should return OK.
1125      */
1126     std::map<std::string, DBStatus> result;
1127     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1128 
1129     /**
1130      * @tc.expected: step4. onComplete should be called, and status is time_out
1131      */
1132     ASSERT_TRUE(result.size() == devices.size());
1133     for (const auto &pair : result) {
1134         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1135         EXPECT_TRUE(pair.second == OK);
1136     }
1137     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1138 }
1139 
1140 /**
1141  * @tc.name: SyncRetry002
1142  * @tc.desc: use sync retry sync use pull
1143  * @tc.type: FUNC
1144  * @tc.require: AR000CKRTD AR000CQE0E
1145  * @tc.author: zhuwentao
1146  */
1147 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry002, TestSize.Level3)
1148 {
1149     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE, 4u);
1150     std::vector<std::string> devices;
1151     devices.push_back(g_deviceB->GetDeviceId());
1152 
1153     /**
1154      * @tc.steps: step1. set sync retry
1155      * @tc.expected: step1, Pragma return OK.
1156      */
1157     int pragmaData = 1;
1158     PragmaData input = static_cast<PragmaData>(&pragmaData);
1159     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1160 
1161     /**
1162      * @tc.steps: step2. deviceA call sync and wait
1163      * @tc.expected: step2. sync should return OK.
1164      */
1165     std::map<std::string, DBStatus> result;
1166     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1167 
1168     /**
1169      * @tc.expected: step3. onComplete should be called, and status is time_out
1170      */
1171     ASSERT_TRUE(result.size() == devices.size());
1172     for (const auto &pair : result) {
1173         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1174         EXPECT_TRUE(pair.second == TIME_OUT);
1175     }
1176     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1177 }
1178 
1179 /**
1180  * @tc.name: SyncRetry003
1181  * @tc.desc: use sync retry sync use push by compress
1182  * @tc.type: FUNC
1183  * @tc.require: AR000CKRTD AR000CQE0E
1184  * @tc.author: zhuwentao
1185  */
1186 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry003, TestSize.Level3)
1187 {
1188     if (g_kvDelegatePtr != nullptr) {
1189         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1190         g_kvDelegatePtr = nullptr;
1191     }
1192     /**
1193      * @tc.steps: step1. open db use Compress
1194      * @tc.expected: step1, Pragma return OK.
1195      */
1196     KvStoreNbDelegate::Option option;
1197     option.isNeedCompressOnSync = true;
1198     option.compressionRate = 70;
1199     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1200     ASSERT_TRUE(g_kvDelegateStatus == OK);
1201     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1202 
1203     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1204     std::vector<std::string> devices;
1205     devices.push_back(g_deviceB->GetDeviceId());
1206 
1207     /**
1208      * @tc.steps: step2. set sync retry
1209      * @tc.expected: step2, Pragma return OK.
1210      */
1211     int pragmaData = 1;
1212     PragmaData input = static_cast<PragmaData>(&pragmaData);
1213     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1214 
1215     /**
1216      * @tc.steps: step3. deviceA put {k1, v1}, {k2, v2}
1217      */
1218     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1219 
1220     /**
1221      * @tc.steps: step4. deviceA call sync and wait
1222      * @tc.expected: step4. sync should return OK.
1223      */
1224     std::map<std::string, DBStatus> result;
1225     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1226 
1227     /**
1228      * @tc.expected: step5. onComplete should be called, and status is time_out
1229      */
1230     ASSERT_TRUE(result.size() == devices.size());
1231     for (const auto &pair : result) {
1232         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1233         EXPECT_TRUE(pair.second == OK);
1234     }
1235     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1236 }
1237 
1238 /**
1239  * @tc.name: SyncRetry004
1240  * @tc.desc: use query sync retry sync use push
1241  * @tc.type: FUNC
1242  * @tc.require: AR000CKRTD AR000CQE0E
1243  * @tc.author: zhuwentao
1244  */
1245 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry004, TestSize.Level3)
1246 {
1247     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1248     std::vector<std::string> devices;
1249     devices.push_back(g_deviceB->GetDeviceId());
1250 
1251     /**
1252      * @tc.steps: step1. set sync retry
1253      * @tc.expected: step1, Pragma return OK.
1254      */
1255     int pragmaData = 1;
1256     PragmaData input = static_cast<PragmaData>(&pragmaData);
1257     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1258 
1259     /**
1260      * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1261      */
1262     for (int i = 0; i < 5; i++) {
1263         Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 128); // rand num 1024 for test
1264         Value value;
1265         DistributedDBToolsUnitTest::GetRandomKeyValue(value, 256u);
1266         EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1267     }
1268 
1269     /**
1270      * @tc.steps: step3. deviceA call sync and wait
1271      * @tc.expected: step3. sync should return OK.
1272      */
1273     std::map<std::string, DBStatus> result;
1274     std::vector<uint8_t> prefixKey({'a', 'b'});
1275     Query query = Query::Select().PrefixKey(prefixKey);
1276     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
1277 
1278     /**
1279      * @tc.expected: step4. onComplete should be called, and status is time_out
1280      */
1281     ASSERT_TRUE(result.size() == devices.size());
1282     for (const auto &pair : result) {
1283         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1284         EXPECT_TRUE(pair.second == OK);
1285     }
1286     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1287 }
1288 
1289 /**
1290  * @tc.name: SyncRetry005
1291  * @tc.desc: use sync retry sync use pull by compress
1292  * @tc.type: FUNC
1293  * @tc.require: AR000CKRTD AR000CQE0E
1294  * @tc.author: zhangqiquan
1295  */
1296 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry005, TestSize.Level3)
1297 {
1298     if (g_kvDelegatePtr != nullptr) {
1299         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1300         g_kvDelegatePtr = nullptr;
1301     }
1302     /**
1303      * @tc.steps: step1. open db use Compress
1304      * @tc.expected: step1, Pragma return OK.
1305      */
1306     KvStoreNbDelegate::Option option;
1307     option.isNeedCompressOnSync = true;
1308     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1309     ASSERT_TRUE(g_kvDelegateStatus == OK);
1310     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1311 
1312     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1313     std::vector<std::string> devices;
1314     devices.push_back(g_deviceB->GetDeviceId());
1315 
1316     /**
1317      * @tc.steps: step2. set sync retry
1318      * @tc.expected: step2, Pragma return OK.
1319      */
1320     int pragmaData = 1;
1321     PragmaData input = static_cast<PragmaData>(&pragmaData);
1322     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1323 
1324     /**
1325      * @tc.steps: step3. deviceA call sync and wait
1326      * @tc.expected: step3. sync should return OK.
1327      */
1328     std::map<std::string, DBStatus> result;
1329     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1330 
1331     /**
1332      * @tc.expected: step4. onComplete should be called, and status is time_out
1333      */
1334     ASSERT_TRUE(result.size() == devices.size());
1335     for (const auto &pair : result) {
1336         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1337         EXPECT_EQ(pair.second, OK);
1338     }
1339     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1340 }
1341 
1342 /**
1343  * @tc.name: ReSetWatchDogTest001
1344  * @tc.desc: trigger resetWatchDog while pull
1345  * @tc.type: FUNC
1346  * @tc.require: AR000CKRTD AR000CQE0E
1347  * @tc.author: zhuwentao
1348  */
1349 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, ReSetWaterDogTest001, TestSize.Level3)
1350 {
1351     ASSERT_NO_FATAL_FAILURE(ReSetWaterDogTest001());
1352 }
1353 
1354 /**
1355   * @tc.name: RebuildSync001
1356   * @tc.desc: rebuild db and sync again
1357   * @tc.type: FUNC
1358   * @tc.require:
1359   * @tc.author: zhuwentao
1360   */
1361 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync001, TestSize.Level3)
1362 {
1363     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1364     /**
1365      * @tc.steps: step1. sync deviceB data to A and check data
1366      * * @tc.expected: step1. interface return ok
1367     */
1368     Key key1 = {'1'};
1369     Key key2 = {'2'};
1370     Value value = {'1'};
1371     Timestamp currentTime;
1372     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1373     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1374     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1375     EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1376     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1377 
1378     Value actualValue;
1379     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1380     EXPECT_EQ(actualValue, value);
1381     actualValue.clear();
1382     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1383     EXPECT_EQ(actualValue, value);
1384     /**
1385      * @tc.steps: step2. delete db and rebuild
1386      * * @tc.expected: step2. interface return ok
1387     */
1388     g_mgr.CloseKvStore(g_kvDelegatePtr);
1389     g_kvDelegatePtr = nullptr;
1390     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1391     KvStoreNbDelegate::Option option;
1392     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1393     ASSERT_TRUE(g_kvDelegateStatus == OK);
1394     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1395     /**
1396      * @tc.steps: step3. sync to device A again
1397      * * @tc.expected: step3. sync ok
1398     */
1399     value = {'2'};
1400     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1401     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1402     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1403     /**
1404      * @tc.steps: step4. check data in device A
1405      * * @tc.expected: step4. check ok
1406     */
1407     actualValue.clear();
1408     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1409     EXPECT_EQ(actualValue, value);
1410 }
1411 
1412 /**
1413   * @tc.name: RebuildSync002
1414   * @tc.desc: test clear remote data when receive data
1415   * @tc.type: FUNC
1416   * @tc.require:
1417   * @tc.author: zhuwentao
1418   */
1419 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync002, TestSize.Level1)
1420 {
1421     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1422     std::vector<std::string> devices;
1423     devices.push_back(g_deviceB->GetDeviceId());
1424     /**
1425      * @tc.steps: step1. device A SET_WIPE_POLICY
1426      * * @tc.expected: step1. interface return ok
1427     */
1428     int pragmaData = 2; // 2 means enable
1429     PragmaData input = static_cast<PragmaData>(&pragmaData);
1430     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_WIPE_POLICY, input) == OK);
1431     /**
1432      * @tc.steps: step2. sync deviceB data to A and check data
1433      * * @tc.expected: step2. interface return ok
1434     */
1435     Key key1 = {'1'};
1436     Key key2 = {'2'};
1437     Key key3 = {'3'};
1438     Key key4 = {'4'};
1439     Value value = {'1'};
1440     Timestamp currentTime;
1441     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1442     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1443     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1444     EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1445     EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1446     /**
1447      * @tc.steps: step3. deviceA call pull sync
1448      * @tc.expected: step3. sync should return OK.
1449      */
1450     std::map<std::string, DBStatus> result;
1451     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result) == OK);
1452 
1453     /**
1454      * @tc.expected: step4. onComplete should be called, check data
1455      */
1456     ASSERT_TRUE(result.size() == devices.size());
1457     for (const auto &pair : result) {
1458         EXPECT_TRUE(pair.second == OK);
1459     }
1460     Value actualValue;
1461     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1462     EXPECT_EQ(actualValue, value);
1463     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1464     EXPECT_EQ(actualValue, value);
1465     /**
1466      * @tc.steps: step5. device B rebuild and put some data
1467      * * @tc.expected: step5. rebuild ok
1468     */
1469     if (g_deviceB != nullptr) {
1470         delete g_deviceB;
1471         g_deviceB = nullptr;
1472     }
1473     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
1474     ASSERT_TRUE(g_deviceB != nullptr);
1475     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
1476     ASSERT_TRUE(syncInterfaceB != nullptr);
1477     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
1478     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1479     EXPECT_EQ(g_deviceB->PutData(key3, value, currentTime, 0), E_OK);
1480     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1481     EXPECT_EQ(g_deviceB->PutData(key4, value, currentTime, 0), E_OK);
1482     /**
1483      * @tc.steps: step6. sync to device A again and check data
1484      * * @tc.expected: step6. sync ok
1485     */
1486     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1487     EXPECT_EQ(g_kvDelegatePtr->Get(key3, actualValue), OK);
1488     EXPECT_EQ(actualValue, value);
1489     EXPECT_EQ(g_kvDelegatePtr->Get(key4, actualValue), OK);
1490     EXPECT_EQ(actualValue, value);
1491     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1492     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
1493 }
1494 
1495 /**
1496   * @tc.name: RebuildSync003
1497   * @tc.desc: test clear history data when receive ack
1498   * @tc.type: FUNC
1499   * @tc.require:
1500   * @tc.author: zhuwentao
1501   */
1502 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync003, TestSize.Level1)
1503 {
1504     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1505     /**
1506      * @tc.steps: step1. sync deviceB data to A and check data
1507      * * @tc.expected: step1. interface return ok
1508     */
1509     Key key1 = {'1'};
1510     Key key2 = {'2'};
1511     Key key3 = {'3'};
1512     Key key4 = {'4'};
1513     Value value = {'1'};
1514     EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1515     EXPECT_EQ(g_deviceB->PutData(key2, value, 2u, 0), E_OK); // 2: timestamp
1516     EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1517     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1518     Value actualValue;
1519     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1520     EXPECT_EQ(actualValue, value);
1521     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1522     EXPECT_EQ(actualValue, value);
1523     VirtualDataItem item;
1524     EXPECT_EQ(g_deviceB->GetData(key3, item), E_OK);
1525     EXPECT_EQ(item.value, value);
1526     /**
1527      * @tc.steps: step2. device B sync to device A,but make it failed
1528      * * @tc.expected: step2. interface return ok
1529     */
1530     EXPECT_EQ(g_deviceB->PutData(key4, value, 3u, 0), E_OK); // 3: timestamp
1531     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_A, DATA_SYNC_MESSAGE);
1532     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1533     /**
1534      * @tc.steps: step3. device B set delay send time
1535      * * @tc.expected: step3. interface return ok
1536     */
1537     std::set<std::string> delayDevice = {DEVICE_B};
1538     g_communicatorAggregator->SetSendDelayInfo(3000u, DATA_SYNC_MESSAGE, 1u, 0u, delayDevice); // delay 3000ms one time
1539     /**
1540      * @tc.steps: step4. device A rebuilt, device B push data to A and set clear remote data mark into context after 1s
1541      * * @tc.expected: step4. interface return ok
1542     */
1543     g_deviceB->SetClearRemoteStaleData(true);
1544     g_mgr.CloseKvStore(g_kvDelegatePtr);
1545     g_kvDelegatePtr = nullptr;
1546     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1547     KvStoreNbDelegate::Option option;
1548     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1549     ASSERT_TRUE(g_kvDelegateStatus == OK);
1550     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1551     std::map<std::string, DBStatus> result;
1552     std::vector<std::string> devices = {g_deviceB->GetDeviceId()};
1553     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1554     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1555     /**
1556      * @tc.steps: step5. device B sync to A, make it clear history data and check data
1557      * * @tc.expected: step5. interface return ok
1558     */
1559     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1560     EXPECT_EQ(g_deviceB->GetData(key3, item), -E_NOT_FOUND);
1561     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1562     EXPECT_EQ(actualValue, value);
1563     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1564     EXPECT_EQ(actualValue, value);
1565     g_communicatorAggregator->ResetSendDelayInfo();
1566 }
1567 
1568 /**
1569   * @tc.name: RemoveDeviceData001
1570   * @tc.desc: call rekey and removeDeviceData Concurrently
1571   * @tc.type: FUNC
1572   * @tc.require: AR000D487B
1573   * @tc.author: zhuwentao
1574   */
1575 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData001, TestSize.Level1)
1576 {
1577     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1578     /**
1579      * @tc.steps: step1. sync deviceB data to A
1580      * * @tc.expected: step1. interface return ok
1581     */
1582     Key key1 = {'1'};
1583     Key key2 = {'2'};
1584     Value value = {'1'};
1585     g_deviceB->PutData(key1, value, 1, 0);
1586     g_deviceB->PutData(key2, value, 2, 0);
1587     g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true);
1588 
1589     Value actualValue;
1590     g_kvDelegatePtr->Get(key1, actualValue);
1591     EXPECT_EQ(actualValue, value);
1592     actualValue.clear();
1593     g_kvDelegatePtr->Get(key2, actualValue);
1594     EXPECT_EQ(actualValue, value);
1595     /**
1596      * @tc.steps: step2. call Rekey and RemoveDeviceData Concurrently
1597      * * @tc.expected: step2. interface return ok
1598     */
__anon7f2dc5a01302() 1599     std::thread thread1([]() {
1600         CipherPassword passwd3;
1601         std::vector<uint8_t> passwdVect = {'p', 's', 'd', 'z'};
1602         passwd3.SetValue(passwdVect.data(), passwdVect.size());
1603         g_kvDelegatePtr->Rekey(passwd3);
1604     });
__anon7f2dc5a01402() 1605     std::thread thread2([]() {
1606         g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
1607     });
1608     thread1.join();
1609     thread2.join();
1610 }
1611 
1612 /**
1613   * @tc.name: DeviceOfflineSyncTask001
1614   * @tc.desc: Test sync task when device offline and close db Concurrently
1615   * @tc.type: FUNC
1616   * @tc.require: AR000HI2JS
1617   * @tc.author: zhuwentao
1618   */
1619 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask001, TestSize.Level3)
1620 {
1621     DBStatus status = OK;
1622     std::vector<std::string> devices;
1623     devices.push_back(g_deviceB->GetDeviceId());
1624 
1625     /**
1626      * @tc.steps: step1. deviceA put {k1, v1}
1627      */
1628     Key key = {'1'};
1629     Value value = {'1'};
1630     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1631 
1632     /**
1633      * @tc.steps: step2. deviceA set auto sync and put some key/value
1634      * @tc.expected: step2. interface should return OK.
1635      */
1636     bool autoSync = true;
1637     PragmaData data = static_cast<PragmaData>(&autoSync);
1638     status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1639     ASSERT_EQ(status, OK);
1640 
1641     Key key1 = {'2'};
1642     Key key2 = {'3'};
1643     Key key3 = {'4'};
1644     Key key4 = {'5'};
1645     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1646     ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1647     ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1648     ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1649     ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value) == OK);
1650     /**
1651      * @tc.steps: step3. device offline and close db Concurrently
1652      * @tc.expected: step3. interface should return OK.
1653      */
__anon7f2dc5a01502() 1654     std::thread thread1([]() {
1655         g_mgr.CloseKvStore(g_kvDelegatePtr);
1656         g_kvDelegatePtr = nullptr;
1657     });
__anon7f2dc5a01602() 1658     std::thread thread2([]() {
1659         g_deviceB->Offline();
1660     });
1661     thread1.join();
1662     thread2.join();
1663     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1664     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1665 }
1666 
1667 /**
1668   * @tc.name: DeviceOfflineSyncTask002
1669   * @tc.desc: Test sync task when autoSync and close db Concurrently
1670   * @tc.type: FUNC
1671   * @tc.require:
1672   * @tc.author: zhuwentao
1673   */
1674 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask002, TestSize.Level3)
1675 {
1676     DBStatus status = OK;
1677     g_deviceC->Offline();
1678 
1679     /**
1680      * @tc.steps: step1. deviceA put {k1, v1}
1681      */
1682     Key key = {'1'};
1683     Value value = {'1'};
1684     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1685 
1686     /**
1687      * @tc.steps: step2. deviceA set auto sync and put some key/value
1688      * @tc.expected: step2. interface should return OK.
1689      */
1690     bool autoSync = true;
1691     PragmaData data = static_cast<PragmaData>(&autoSync);
1692     status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1693     ASSERT_EQ(status, OK);
1694     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME * 2));
1695 
1696     Key key1 = {'2'};
1697     Key key2 = {'3'};
1698     Key key3 = {'4'};
1699     ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1700     ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1701     ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1702     /**
1703      * @tc.steps: step3. close db
1704      * @tc.expected: step3. interface should return OK.
1705      */
1706     g_mgr.CloseKvStore(g_kvDelegatePtr);
1707     g_kvDelegatePtr = nullptr;
1708     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1709 }
1710 
1711 /**
1712   * @tc.name: DeviceOfflineSyncTask003
1713   * @tc.desc: Test sync task when device offline after call sync
1714   * @tc.type: FUNC
1715   * @tc.require:
1716   * @tc.author: zhuwentao
1717   */
1718 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask003, TestSize.Level3)
1719 {
1720     std::vector<std::string> devices;
1721     devices.push_back(g_deviceB->GetDeviceId());
1722 
1723     /**
1724      * @tc.steps: step1. deviceA put {k1, v1}
1725      */
1726     Key key = {'1'};
1727     Value value = {'1'};
1728     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1729     /**
1730      * @tc.steps: step2. device offline after call sync
1731      * @tc.expected: step2. interface should return OK.
1732      */
1733     Query query = Query::Select().PrefixKey(key);
1734     ASSERT_TRUE(g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, query, false) == OK);
1735     std::this_thread::sleep_for(std::chrono::milliseconds(15)); // wait for 15ms
1736     g_deviceB->Offline();
1737 }
1738 
1739 /**
1740   * @tc.name: GetSyncDataFail001
1741   * @tc.desc: test get sync data failed when sync
1742   * @tc.type: FUNC
1743   * @tc.require:
1744   * @tc.author: zhuwentao
1745   */
1746 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail001, TestSize.Level1)
1747 {
1748     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1749     /**
1750      * @tc.steps: step1. device B set get data errCode control and put some data
1751      * * @tc.expected: step1. interface return ok
1752     */
1753     g_deviceB->SetGetDataErrCode(1, -E_BUSY, true);
1754     Key key1 = {'1'};
1755     Value value = {'1'};
1756     EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1757     /**
1758      * @tc.steps: step2. device B sync to device A and check data
1759      * * @tc.expected: step2. interface return ok
1760     */
1761     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1762     Value actualValue;
1763     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1764     g_deviceB->ResetDataControl();
1765 }
1766 
1767 /**
1768   * @tc.name: GetSyncDataFail002
1769   * @tc.desc: test get sync data failed when sync with large data
1770   * @tc.type: FUNC
1771   * @tc.require: AR000D487B
1772   * @tc.author: zhuwentao
1773   */
1774 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail002, TestSize.Level1)
1775 {
1776     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1777     /**
1778      * @tc.steps: step1. device B set get data errCode control and put some data
1779      * * @tc.expected: step1. interface return ok
1780     */
1781     g_deviceB->SetGetDataErrCode(2, -E_BUSY, true);
1782     int totalSize = 4000u;
1783     std::vector<Entry> entries;
1784     std::vector<Key> keys;
1785     const int keyLen = 10; // 20 Bytes
1786     const int valueLen = 10; // 20 Bytes
1787     DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1788     uint32_t i = 1u;
1789     for (const auto &entry : entries) {
1790         EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1791         i++;
1792     }
1793     /**
1794      * @tc.steps: step2. device B sync to device A and check data
1795      * * @tc.expected: step2. interface return ok
1796     */
1797     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1798     std::this_thread::sleep_for(std::chrono::seconds(1));
1799     Value actualValue;
1800     for (int j = 1u; j <= totalSize; j++) {
1801         if (j > totalSize / 2) {
1802             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1803         } else {
1804             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1805         }
1806     }
1807     g_deviceB->ResetDataControl();
1808 }
1809 
1810 /**
1811   * @tc.name: GetSyncDataFail003
1812   * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1813   * @tc.type: FUNC
1814   * @tc.require:
1815   * @tc.author: zhuwentao
1816   */
1817 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail003, TestSize.Level1)
1818 {
1819     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1820     /**
1821      * @tc.steps: step1. device B set get data errCode control and put some data
1822      * * @tc.expected: step1. interface return ok
1823     */
1824     g_deviceB->SetGetDataErrCode(1, -E_EKEYREVOKED, true);
1825     Key key1 = {'1'};
1826     Key key2 = {'3'};
1827     Value value = {'1'};
1828     EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1829     EXPECT_EQ(g_kvDelegatePtr->Put(key2, value), OK);
1830     /**
1831      * @tc.steps: step2. device B sync to device A and check data
1832      * * @tc.expected: step2. interface return ok
1833     */
1834     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1835     Value actualValue;
1836     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1837     VirtualDataItem item;
1838     EXPECT_EQ(g_deviceB->GetData(key2, item), E_OK);
1839     g_deviceB->ResetDataControl();
1840 }
1841 
1842 /**
1843   * @tc.name: GetSyncDataFail004
1844   * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1845   * @tc.type: FUNC
1846   * @tc.require:
1847   * @tc.author: zhuwentao
1848   */
1849 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail004, TestSize.Level1)
1850 {
1851     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1852     /**
1853      * @tc.steps: step1. device B set get data errCode control and put some data
1854      * * @tc.expected: step1. interface return ok
1855     */
1856     g_deviceB->SetGetDataErrCode(2, -E_EKEYREVOKED, true);
1857     int totalSize = 4000u;
1858     std::vector<Entry> entries;
1859     std::vector<Key> keys;
1860     const int keyLen = 10; // 20 Bytes
1861     const int valueLen = 10; // 20 Bytes
1862     DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1863     uint32_t i = 1u;
1864     for (const auto &entry : entries) {
1865         EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1866         i++;
1867     }
1868     Key key = {'a', 'b', 'c'};
1869     Value value = {'1'};
1870     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1871     /**
1872      * @tc.steps: step2. device B sync to device A and check data
1873      * * @tc.expected: step2. interface return ok
1874     */
1875     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1876     std::this_thread::sleep_for(std::chrono::seconds(1));
1877     Value actualValue;
1878     for (int j = 1u; j <= totalSize; j++) {
1879         if (j > totalSize / 2) {
1880             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1881         } else {
1882             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1883         }
1884     }
1885     VirtualDataItem item;
1886     EXPECT_EQ(g_deviceB->GetData(key, item), E_OK);
1887     g_deviceB->ResetDataControl();
1888 }
1889 
1890 /**
1891   * @tc.name: InterceptDataFail001
1892   * @tc.desc: test intercept data failed when sync
1893   * @tc.type: FUNC
1894   * @tc.require:
1895   * @tc.author: zhuwentao
1896   */
1897 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptDataFail001, TestSize.Level1)
1898 {
1899     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1900     /**
1901      * @tc.steps: step1. device A set intercept data errCode and put some data
1902      * * @tc.expected: step1. interface return ok
1903     */
1904     g_kvDelegatePtr->SetPushDataInterceptor(
__anon7f2dc5a01702(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 1905         [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
1906             int errCode = OK;
1907             auto entries = data.GetEntries();
1908             LOGD("====here111,size=%d", entries.size());
1909             for (size_t i = 0; i < entries.size(); i++) {
1910                 Key newKey;
1911                 errCode = data.ModifyKey(i, newKey);
1912                 if (errCode != OK) {
1913                     break;
1914                 }
1915             }
1916             return errCode;
1917         }
1918     );
1919     Key key = {'1'};
1920     Value value = {'1'};
1921     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1922     /**
1923      * @tc.steps: step2. device A sync to device B and check data
1924      * * @tc.expected: step2. interface return ok
1925     */
1926     std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
1927     std::map<std::string, DBStatus> result;
1928     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1929     ASSERT_TRUE(result.size() == devices.size());
1930     for (const auto &pair : result) {
1931         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1932         EXPECT_TRUE(pair.second == INTERCEPT_DATA_FAIL);
1933     }
1934     VirtualDataItem item;
1935     EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
1936 }
1937 
1938 /**
1939   * @tc.name: UpdateKey001
1940   * @tc.desc: test update key can effect local data and sync data, without delete data
1941   * @tc.type: FUNC
1942   * @tc.require:
1943   * @tc.author: zhangqiquan
1944   */
1945 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, UpdateKey001, TestSize.Level1)
1946 {
1947     /**
1948      * @tc.steps: step1. device A set sync data (k1, v1) local data (k2, v2) (k3, v3) and delete (k4, v4)
1949      * @tc.expected: step1. put data return ok
1950      */
1951     Key k1 = {'k', '1'};
1952     Value v1 = {'v', '1'};
1953     g_deviceB->PutData(k1, v1, 1, 0);
1954     ASSERT_EQ(g_deviceB->Sync(SyncMode::SYNC_MODE_PUSH_ONLY, true), E_OK);
1955     Value actualValue;
1956     EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
1957     EXPECT_EQ(v1, actualValue);
1958     Key k2 = {'k', '2'};
1959     Value v2 = {'v', '2'};
1960     Key k3 = {'k', '3'};
1961     Value v3 = {'v', '3'};
1962     Key k4 = {'k', '4'};
1963     Value v4 = {'v', '4'};
1964     EXPECT_EQ(g_kvDelegatePtr->Put(k2, v2), OK);
1965     EXPECT_EQ(g_kvDelegatePtr->Put(k3, v3), OK);
1966     EXPECT_EQ(g_kvDelegatePtr->Put(k4, v4), OK);
1967     EXPECT_EQ(g_kvDelegatePtr->Delete(k4), OK);
1968     /**
1969      * @tc.steps: step2. device A update key and set
1970      * @tc.expected: step2. put data return ok
1971      */
__anon7f2dc5a01802(const Key &originKey, Key &newKey) 1972     DBStatus status = g_kvDelegatePtr->UpdateKey([](const Key &originKey, Key &newKey) {
1973         newKey = originKey;
1974         newKey.push_back('0');
1975     });
1976     EXPECT_EQ(status, OK);
1977     k1.push_back('0');
1978     k2.push_back('0');
1979     k3.push_back('0');
1980     EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
1981     EXPECT_EQ(v1, actualValue);
1982     EXPECT_EQ(g_kvDelegatePtr->Get(k2, actualValue), OK);
1983     EXPECT_EQ(v2, actualValue);
1984     EXPECT_EQ(g_kvDelegatePtr->Get(k3, actualValue), OK);
1985     EXPECT_EQ(v3, actualValue);
1986 }
1987 
1988 /**
1989   * @tc.name: MetaBusy001
1990   * @tc.desc: test sync normal when update water mark busy
1991   * @tc.type: FUNC
1992   * @tc.require:
1993   * @tc.author: zhangqiquan
1994   */
1995 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, MetaBusy001, TestSize.Level1)
1996 {
1997     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1998     Key key = {'1'};
1999     Value value = {'1'};
2000     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
2001     std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2002     std::map<std::string, DBStatus> result;
2003     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2004     ASSERT_EQ(result.size(), devices.size());
2005     for (const auto &pair : result) {
2006         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2007         EXPECT_TRUE(pair.second == OK);
2008     }
2009     value = {'2'};
2010     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
__anon7f2dc5a01902() 2011     g_deviceB->SetSaveDataCallback([] () {
2012         RuntimeContext::GetInstance()->ScheduleTask([]() {
2013             g_deviceB->EraseWaterMark("real_device");
2014         });
2015         std::this_thread::sleep_for(std::chrono::seconds(1));
2016     });
2017     EXPECT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2018     EXPECT_EQ(result.size(), devices.size());
2019     for (const auto &pair : result) {
2020         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2021         EXPECT_TRUE(pair.second == OK);
2022     }
2023     g_deviceB->SetSaveDataCallback(nullptr);
2024     RuntimeContext::GetInstance()->StopTaskPool();
2025 }