• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 #include <thread>
18 
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "generic_single_ver_kv_entry.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "platform_specific.h"
27 #include "query.h"
28 #include "query_sync_object.h"
29 #include "single_ver_data_sync.h"
30 #include "single_ver_serialize_manager.h"
31 #include "sync_types.h"
32 #include "virtual_communicator.h"
33 #include "virtual_communicator_aggregator.h"
34 #include "virtual_single_ver_sync_db_Interface.h"
35 
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40 
41 namespace {
42     string g_testDir;
43     const string STORE_ID = "kv_store_sync_test";
44     const string SCHEMA_STORE_ID = "kv_store_sync_schema_test";
45     const std::string DEVICE_B = "deviceB";
46 
47     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
48     KvStoreDelegateManager g_schemaMgr(SCHEMA_APP_ID, USER_ID);
49     KvStoreConfig g_config;
50     DistributedDBToolsUnitTest g_tool;
51     DBStatus g_kvDelegateStatus = INVALID_ARGS;
52     DBStatus g_schemaKvDelegateStatus = INVALID_ARGS;
53     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
54     KvStoreNbDelegate* g_schemaKvDelegatePtr = nullptr;
55     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
56     KvVirtualDevice *g_deviceB = nullptr;
57 
58     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
59     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
60         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
61     auto g_schemaKvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
62         placeholders::_1, placeholders::_2, std::ref(g_schemaKvDelegateStatus), std::ref(g_schemaKvDelegatePtr));
63     const string SCHEMA_STRING =
64     "{\"SCHEMA_VERSION\":\"1.0\","
65     "\"SCHEMA_MODE\":\"STRICT\","
66     "\"SCHEMA_DEFINE\":{"
67     "\"field_name1\":\"BOOL\","
68     "\"field_name2\":\"BOOL\","
69     "\"field_name3\":\"INTEGER, NOT NULL\","
70     "\"field_name4\":\"LONG, DEFAULT 100\","
71     "\"field_name5\":\"DOUBLE, NOT NULL, DEFAULT 3.14\","
72     "\"field_name6\":\"STRING, NOT NULL, DEFAULT '3.1415'\","
73     "\"field_name7\":\"LONG, DEFAULT 100\","
74     "\"field_name8\":\"LONG, DEFAULT 100\","
75     "\"field_name9\":\"LONG, DEFAULT 100\","
76     "\"field_name10\":\"LONG, DEFAULT 100\""
77     "},"
78     "\"SCHEMA_INDEXES\":[\"$.field_name1\", \"$.field_name2\"]}";
79 
80     const std::string SCHEMA_VALUE1 =
81     "{\"field_name1\":true,"
82     "\"field_name2\":false,"
83     "\"field_name3\":10,"
84     "\"field_name4\":20,"
85     "\"field_name5\":3.14,"
86     "\"field_name6\":\"3.1415\","
87     "\"field_name7\":100,"
88     "\"field_name8\":100,"
89     "\"field_name9\":100,"
90     "\"field_name10\":100}";
91 
92     const std::string SCHEMA_VALUE2 =
93     "{\"field_name1\":false,"
94     "\"field_name2\":true,"
95     "\"field_name3\":100,"
96     "\"field_name4\":200,"
97     "\"field_name5\":3.14,"
98     "\"field_name6\":\"3.1415\","
99     "\"field_name7\":100,"
100     "\"field_name8\":100,"
101     "\"field_name9\":100,"
102     "\"field_name10\":100}";
103 }
104 
105 class DistributedDBSingleVerP2PQuerySyncTest : public testing::Test {
106 public:
107     static void SetUpTestCase(void);
108     static void TearDownTestCase(void);
109     void SetUp();
110     void TearDown();
111 };
112 
SetUpTestCase(void)113 void DistributedDBSingleVerP2PQuerySyncTest::SetUpTestCase(void)
114 {
115     /**
116      * @tc.setup: Init datadir and Virtual Communicator.
117      */
118     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
119     string dir = g_testDir + "/single_ver";
120     DIR* dirTmp = opendir(dir.c_str());
121     if (dirTmp == nullptr) {
122         OS::MakeDBDirectory(dir);
123     } else {
124         closedir(dirTmp);
125     }
126 
127     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
128     ASSERT_TRUE(g_communicatorAggregator != nullptr);
129     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
130 }
131 
TearDownTestCase(void)132 void DistributedDBSingleVerP2PQuerySyncTest::TearDownTestCase(void)
133 {
134     /**
135      * @tc.teardown: Release virtual Communicator and clear data dir.
136      */
137     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
138         LOGE("rm test db files error!");
139     }
140     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
141 }
142 
SetUp(void)143 void DistributedDBSingleVerP2PQuerySyncTest::SetUp(void)
144 {
145     DistributedDBToolsUnitTest::PrintTestCaseInfo();
146     /**
147      * @tc.setup: create virtual device B and get a KvStoreNbDelegate as deviceA
148      */
149     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
150     ASSERT_TRUE(g_deviceB != nullptr);
151     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
152     ASSERT_TRUE(syncInterfaceB != nullptr);
153     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
154 }
155 
TearDown(void)156 void DistributedDBSingleVerP2PQuerySyncTest::TearDown(void)
157 {
158     /**
159      * @tc.teardown: Release device A, B
160      */
161     if (g_kvDelegatePtr != nullptr) {
162         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
163         g_kvDelegatePtr = nullptr;
164         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
165         LOGD("delete kv store status %d", status);
166         ASSERT_TRUE(status == OK);
167     }
168     if (g_schemaKvDelegatePtr != nullptr) {
169         ASSERT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
170         g_schemaKvDelegatePtr = nullptr;
171         DBStatus status = g_schemaMgr.DeleteKvStore(SCHEMA_STORE_ID);
172         LOGD("delete kv store status %d", status);
173         ASSERT_TRUE(status == OK);
174     }
175     if (g_deviceB != nullptr) {
176         delete g_deviceB;
177         g_deviceB = nullptr;
178     }
179     PermissionCheckCallbackV2 nullCallback;
180     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
181 }
182 
InitNormalDb()183 void InitNormalDb()
184 {
185     g_config.dataDir = g_testDir;
186     g_mgr.SetKvStoreConfig(g_config);
187     KvStoreNbDelegate::Option option;
188     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
189     ASSERT_TRUE(g_kvDelegateStatus == OK);
190     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
191 }
192 
InitSchemaDb()193 void InitSchemaDb()
194 {
195     g_config.dataDir = g_testDir;
196     g_schemaMgr.SetKvStoreConfig(g_config);
197     KvStoreNbDelegate::Option option;
198     option.schema = SCHEMA_STRING;
199     g_schemaMgr.GetKvStore(SCHEMA_STORE_ID, option, g_schemaKvDelegateCallback);
200     ASSERT_TRUE(g_schemaKvDelegateStatus == OK);
201     ASSERT_TRUE(g_schemaKvDelegatePtr != nullptr);
202 }
203 
204 /**
205  * @tc.name: Normal Sync 001
206  * @tc.desc: Test normal push sync for keyprefix data.
207  * @tc.type: FUNC
208  * @tc.require: AR000FN6G9
209  * @tc.author: xushaohua
210  */
211 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync001, TestSize.Level1)
212 {
213     InitNormalDb();
214     DBStatus status = OK;
215     std::vector<std::string> devices;
216     devices.push_back(g_deviceB->GetDeviceId());
217 
218     /**
219      * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
220      */
221     Key key = {'1'};
222     Value value = {'1'};
223     const int dataSize = 10;
224     for (int i = 0; i < dataSize; i++) {
225         key.push_back(i);
226         value.push_back(i);
227         status = g_kvDelegatePtr->Put(key, value);
228         ASSERT_TRUE(status == OK);
229         key.pop_back();
230         value.pop_back();
231     }
232     Key key2 = {'2'};
233     Value value2 = {'2'};
234     status = g_kvDelegatePtr->Put(key2, value2);
235     ASSERT_TRUE(status == OK);
236 
237     /**
238      * @tc.steps: step2. deviceA call query sync and wait
239      * @tc.expected: step2. sync should return OK.
240      */
241     Query query = Query::Select().PrefixKey(key);
242     std::map<std::string, DBStatus> result;
243     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
244     ASSERT_TRUE(status == OK);
245 
246     /**
247      * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
248      */
249     ASSERT_TRUE(result.size() == devices.size());
250     for (const auto &pair : result) {
251         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
252         EXPECT_TRUE(pair.second == OK);
253     }
254     VirtualDataItem item;
255     for (int i = 0; i < dataSize; i++) {
256         key.push_back(i);
257         value.push_back(i);
258         g_deviceB->GetData(key, item);
259         EXPECT_TRUE(item.value == value);
260         key.pop_back();
261         value.pop_back();
262     }
263     EXPECT_TRUE(g_deviceB->GetData(key2, item) != E_OK);
264 }
265 
266 /**
267  * @tc.name: Normal Sync 002
268  * @tc.desc: Test normal push sync for limit and offset.
269  * @tc.type: FUNC
270  * @tc.require: AR000FN6G9
271  * @tc.author: xushaohua
272  */
273 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync002, TestSize.Level1)
274 {
275     InitNormalDb();
276     DBStatus status = OK;
277     std::vector<std::string> devices;
278     devices.push_back(g_deviceB->GetDeviceId());
279 
280     /**
281      * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
282      */
283     Key key = {'1'};
284     Value value = {'1'};
285     const int dataSize = 10;
286     for (int i = 0; i < dataSize; i++) {
287         key.push_back(i);
288         value.push_back(i);
289         status = g_kvDelegatePtr->Put(key, value);
290         ASSERT_TRUE(status == OK);
291         key.pop_back();
292         value.pop_back();
293     }
294 
295     /**
296      * @tc.steps: step2. deviceA call sync and wait
297      * @tc.expected: step2. sync should return OK.
298      */
299     const int limit = 5;
300     const int offset = 4;
301     Query query = Query::Select().PrefixKey(key).Limit(limit, offset);
302     std::map<std::string, DBStatus> result;
303     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
304     ASSERT_TRUE(status == OK);
305 
306     /**
307      * @tc.expected: step3. onComplete should be called, DeviceB have {k4,v4} {k8, v8}
308      */
309     ASSERT_TRUE(result.size() == devices.size());
310     for (const auto &pair : result) {
311         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
312         EXPECT_TRUE(pair.second == OK);
313     }
314 
315     VirtualDataItem item;
316     for (int i = limit - 1; i < limit + offset; i++) {
317         key.push_back(i);
318         value.push_back(i);
319         g_deviceB->GetData(key, item);
320         EXPECT_TRUE(item.value == value);
321         key.pop_back();
322         value.pop_back();
323     }
324 }
325 
326 /**
327  * @tc.name: Normal Sync 001
328  * @tc.desc: Test normal push_and_pull sync for keyprefix data.
329  * @tc.type: FUNC
330  * @tc.require: AR000FN6G9
331  * @tc.author: zhuwentao
332  */
333 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync003, TestSize.Level1)
334 {
335     InitNormalDb();
336     DBStatus status = OK;
337     std::vector<std::string> devices;
338     devices.push_back(g_deviceB->GetDeviceId());
339 
340     /**
341      * @tc.steps: step1. deviceA put {k, v}, {b, v}
342      */
343     Key key = {'1'};
344     Value value = {'1'};
345     const int dataSize = 10;
346     status = g_kvDelegatePtr->Put(key, value);
347     ASSERT_TRUE(status == OK);
348     Key key2 = {'2'};
349     Value value2 = {'2'};
350     status = g_kvDelegatePtr->Put(key2, value2);
351     ASSERT_TRUE(status == OK);
352 
353     /**
354      * @tc.steps: step2. deviceB put {b0, v0} - {b9, v9}, {c, v}
355      */
356     for (int i = 0; i < dataSize; i++) {
357         key2.push_back(i);
358         value2.push_back(i);
359         g_deviceB->PutData(key2, value2, 10 + i, 0);
360         key2.pop_back();
361         value2.pop_back();
362     }
363     Key key3 = {'3'};
364     Value value3 = {'3'};
365     g_deviceB->PutData(key3, value3, 20, 0);
366 
367     /**
368      * @tc.steps: step2. deviceA call query sync and wait
369      * @tc.expected: step2. sync should return OK.
370      */
371     Query query = Query::Select().PrefixKey(key2);
372     std::map<std::string, DBStatus> result;
373     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
374     ASSERT_TRUE(status == OK);
375 
376     /**
377      * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}, DeviceB have {b, v}
378      */
379     ASSERT_TRUE(result.size() == devices.size());
380     for (const auto &pair : result) {
381         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
382         EXPECT_TRUE(pair.second == OK);
383     }
384     VirtualDataItem item;
385     Value tmpValue;
386     for (int i = 0; i < dataSize; i++) {
387         key2.push_back(i);
388         value2.push_back(i);
389         g_kvDelegatePtr->Get(key2, tmpValue);
390         EXPECT_TRUE(tmpValue == value2);
391         key2.pop_back();
392         value2.pop_back();
393     }
394     EXPECT_TRUE(g_deviceB->GetData(key, item) != E_OK);
395     EXPECT_TRUE(g_deviceB->GetData(key2, item) == E_OK);
396     g_kvDelegatePtr->Get(key3, tmpValue);
397     EXPECT_TRUE(tmpValue != value3);
398 }
399 
400 /**
401  * @tc.name: Normal Sync 001
402  * @tc.desc: Test normal pull sync for keyprefix data.
403  * @tc.type: FUNC
404  * @tc.require: AR000FN6G9
405  * @tc.author: zhuwentao
406  */
407 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync004, TestSize.Level1)
408 {
409     InitNormalDb();
410     DBStatus status = OK;
411     std::vector<std::string> devices;
412     devices.push_back(g_deviceB->GetDeviceId());
413     /**
414      * @tc.steps: step1. deviceB put {k1, v1} - {k9, k9}, {b0, v0} - {b9, v9}
415      */
416     Key key = {'1'};
417     Value value = {'1'};
418     const int dataSize = 10;
419     Key key2 = {'2'};
420     Value value2 = {'2'};
421     vector<std::pair<Key, Value>> key1Vec;
422     vector<std::pair<Key, Value>> key2Vec;
423     for (int i = 0; i < dataSize; i++) {
424         Key tmpKey(key);
425         Value tmpValue(value);
426         tmpKey.push_back(i);
427         tmpValue.push_back(i);
428         key1Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
429     }
430     for (int i = 0; i < dataSize; i++) {
431         Key tmpKey(key2);
432         Value tmpValue(value2);
433         tmpKey.push_back(i);
434         tmpValue.push_back(i);
435         key2Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
436     }
437     for (int i = 0; i < dataSize; i++) {
438         g_deviceB->PutData(key2Vec[i].first, key2Vec[i].second, 20 + i, 0);
439         g_deviceB->PutData(key1Vec[i].first, key1Vec[i].second, 10 + i, 0);
440     }
441 
442     /**
443      * @tc.steps: step2. deviceA call query sync and wait
444      * @tc.expected: step2. sync should return OK.
445      */
446     Query query = Query::Select().PrefixKey(key2);
447     std::map<std::string, DBStatus> result;
448     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
449     ASSERT_TRUE(status == OK);
450 
451     /**
452      * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}
453      */
454     ASSERT_TRUE(result.size() == devices.size());
455     for (const auto &pair : result) {
456         EXPECT_TRUE(pair.second == OK);
457     }
458     VirtualDataItem item;
459     Value tmpValue;
460     for (int i = 0; i < dataSize; i++) {
461         g_kvDelegatePtr->Get(key2Vec[i].first, tmpValue);
462         EXPECT_TRUE(tmpValue == key2Vec[i].second);
463         g_kvDelegatePtr->Get(key1Vec[i].first, tmpValue);
464         EXPECT_TRUE(tmpValue != key1Vec[i].second);
465     }
466 }
467 
468 /**
469  * @tc.name: NormalSync005
470  * @tc.desc: Test normal push sync for inkeys query.
471  * @tc.type: FUNC
472  * @tc.require: AR000GOHO7
473  * @tc.author: lidongwei
474  */
475 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync005, TestSize.Level1)
476 {
477     InitNormalDb();
478     std::vector<std::string> devices;
479     devices.push_back(g_deviceB->GetDeviceId());
480 
481     /**
482      * @tc.steps: step1. deviceA put K1-K5
483      */
484     ASSERT_EQ(g_kvDelegatePtr->PutBatch(
485         {{KEY_1, VALUE_1}, {KEY_2, VALUE_2}, {KEY_3, VALUE_3}, {KEY_4, VALUE_4}, {KEY_5, VALUE_5}}), OK);
486 
487     /**
488      * @tc.steps: step2. deviceA sync K2,K4 and wait
489      * @tc.expected: step2. sync should return OK.
490      */
491     Query query = Query::Select().InKeys({KEY_2, KEY_4});
492     std::map<std::string, DBStatus> result;
493     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OK);
494 
495     /**
496      * @tc.expected: step3. onComplete should be called.
497      */
498     ASSERT_EQ(result.size(), devices.size());
499     for (const auto &pair : result) {
500         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
501         EXPECT_EQ(pair.second, OK);
502     }
503 
504     /**
505      * @tc.steps: step4. deviceB have K2K4 and have no K1K3K5.
506      * @tc.expected: step4. sync should return OK.
507      */
508     VirtualDataItem item;
509     EXPECT_EQ(g_deviceB->GetData(KEY_2, item), E_OK);
510     EXPECT_EQ(item.value, VALUE_2);
511     EXPECT_EQ(g_deviceB->GetData(KEY_4, item), E_OK);
512     EXPECT_EQ(item.value, VALUE_4);
513     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
514     EXPECT_EQ(g_deviceB->GetData(KEY_3, item), -E_NOT_FOUND);
515     EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
516 
517     /**
518      * @tc.steps: step5. deviceA sync with invalid inkeys query
519      * @tc.expected: step5. sync failed and the rc is right.
520      */
521     query = Query::Select().InKeys({});
522     result.clear();
523     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
524 
525     std::set<Key> keys;
526     for (uint8_t i = 0; i < DBConstant::MAX_BATCH_SIZE + 1; i++) {
527         Key key = { i };
528         keys.emplace(key);
529     }
530     query = Query::Select().InKeys(keys);
531     result.clear();
532     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OVER_MAX_LIMITS);
533 
534     query = Query::Select().InKeys({{}});
535     result.clear();
536     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
537 }
538 
539 /**
540  * @tc.name: NormalSync006
541  * @tc.desc: Test normal push sync with query by 32 devices;
542  * @tc.type: FUNC
543  * @tc.require:
544  * @tc.author: zhuwentao
545  */
546 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync006, TestSize.Level1)
547 {
548     /**
549      * @tc.steps: step1. init db and 32 devices
550      */
551     InitNormalDb();
552     uint32_t syncDevCount = 1u;
553     std::vector<KvVirtualDevice *> virtualDeviceVec(syncDevCount, nullptr);
554     const std::string device = "deviceTmp_";
555     std::vector<std::string> devices;
556     bool isError = false;
557     for (uint32_t i = 0; i < syncDevCount; i++) {
558         std::string tmpDev = device + std::to_string(i);
559         virtualDeviceVec[i] = new (std::nothrow) KvVirtualDevice(tmpDev);
560         if (virtualDeviceVec[i] == nullptr) {
561             isError = true;
562             break;
563         }
564         VirtualSingleVerSyncDBInterface *tmpSyncInterface = new (std::nothrow) VirtualSingleVerSyncDBInterface();
565         if (tmpSyncInterface == nullptr) {
566             isError = true;
567             break;
568         }
569         ASSERT_EQ(virtualDeviceVec[i]->Initialize(g_communicatorAggregator, tmpSyncInterface), E_OK);
570         devices.push_back(virtualDeviceVec[i]->GetDeviceId());
571     }
572     if (isError) {
573         for (uint32_t i = 0; i < syncDevCount; i++) {
574             if (virtualDeviceVec[i] != nullptr) {
575                 delete virtualDeviceVec[i];
576                 virtualDeviceVec[i] = nullptr;
577             }
578         }
579         ASSERT_TRUE(false);
580     }
581     /**
582      * @tc.steps: step2. deviceA put {k0, v0}
583      */
584     Key key = {'1'};
585     Value value = {'1'};
586     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
587     /**
588      * @tc.steps: step3. deviceA call query sync and wait
589      * @tc.expected: step3. sync should return OK.
590      */
591     Query query = Query::Select().PrefixKey(key);
592     std::map<std::string, DBStatus> result;
593     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
594 
595     /**
596      * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
597      */
598     ASSERT_TRUE(result.size() == devices.size());
599     for (const auto &pair : result) {
600         EXPECT_TRUE(pair.second == OK);
601     }
602     VirtualDataItem item;
603     for (uint32_t i = 0; i < syncDevCount; i++) {
604         EXPECT_TRUE(virtualDeviceVec[i]->GetData(key, item) == E_OK);
605         EXPECT_EQ(item.value, value);
606         delete virtualDeviceVec[i];
607         virtualDeviceVec[i] = nullptr;
608     }
609 }
610 
611 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryRequestPacketTest001, TestSize.Level1)
612 {
613     /**
614      * @tc.steps: step1. prepare a QuerySyncRequestPacket.
615      */
616     auto packet = new (std::nothrow) DataRequestPacket;
617     ASSERT_TRUE(packet != nullptr);
618     auto kvEntry = new (std::nothrow) GenericSingleVerKvEntry;
619     ASSERT_TRUE(kvEntry != nullptr);
620     kvEntry->SetTimestamp(1);
621     SyncEntry syncData {.entries = {kvEntry}};
622 #ifndef OMIT_ZLIB
623     ASSERT_TRUE(GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
624         {CompressAlgorithm::ZLIB, SOFTWARE_VERSION_CURRENT}) == E_OK);
625     packet->SetCompressAlgo(CompressAlgorithm::ZLIB);
626     packet->SetFlag(4); // set IS_COMPRESS_DATA flag true
627 #endif
628     packet->SetBasicInfo(-E_NOT_SUPPORT, SOFTWARE_VERSION_CURRENT, SyncModeType::QUERY_PUSH_PULL);
629     packet->SetData(syncData.entries);
630     packet->SetCompressData(syncData.compressedEntries);
631     packet->SetEndWaterMark(INT8_MAX);
632     packet->SetWaterMark(INT16_MAX, INT32_MAX, INT64_MAX);
633     QuerySyncObject syncQuery(Query::Select().PrefixKey({'2'}));
634     packet->SetQuery(syncQuery);
635     packet->SetQueryId(syncQuery.GetIdentify());
636     packet->SetReserved(std::vector<uint64_t> {INT8_MAX});
637 
638     /**
639      * @tc.steps: step2. put the QuerySyncRequestPacket into a message.
640      */
641     Message msg;
642     msg.SetExternalObject(packet);
643     msg.SetMessageId(QUERY_SYNC_MESSAGE);
644     msg.SetMessageType(TYPE_REQUEST);
645 
646     /**
647      * @tc.steps: step3. Serialization the message to a buffer.
648      */
649     int len = SingleVerSerializeManager::CalculateLen(&msg);
650     vector<uint8_t> buffer(len);
651     ASSERT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), E_OK);
652 
653     /**
654      * @tc.steps: step4. DeSerialization the buffer to a message.
655      */
656     Message outMsg(QUERY_SYNC_MESSAGE);
657     outMsg.SetMessageType(TYPE_REQUEST);
658     ASSERT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &outMsg), E_OK);
659 
660     /**
661      * @tc.steps: step5. checkout the outMsg.
662      * @tc.expected: step5. outMsg equal the the in msg
663      */
664     auto outPacket = outMsg.GetObject<DataRequestPacket>();
665     EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
666     EXPECT_EQ(outPacket->GetMode(), SyncModeType::QUERY_PUSH_PULL);
667     EXPECT_EQ(outPacket->GetEndWaterMark(), static_cast<uint64_t>(INT8_MAX));
668     EXPECT_EQ(outPacket->GetLocalWaterMark(), static_cast<uint64_t>(INT16_MAX));
669     EXPECT_EQ(outPacket->GetPeerWaterMark(), static_cast<uint64_t>(INT32_MAX));
670     EXPECT_EQ(outPacket->GetDeletedWaterMark(), static_cast<uint64_t>(INT64_MAX));
671 #ifndef OMIT_ZLIB
672     EXPECT_EQ(outPacket->GetFlag(), static_cast<uint32_t>(4)); // check IS_COMPRESS_DATA flag true
673 #endif
674     EXPECT_EQ(outPacket->GetQueryId(), syncQuery.GetIdentify());
675     EXPECT_EQ(outPacket->GetReserved(), std::vector<uint64_t> {INT8_MAX});
676     EXPECT_EQ(outPacket->GetSendCode(), -E_NOT_SUPPORT);
677     EXPECT_EQ(outPacket->GetData()[0]->GetTimestamp(), 1u);
678 }
679 
680 /**
681  * @tc.name: QueryRequestPacketTest002
682  * @tc.desc: Test exception branch of serialization.
683  * @tc.type: FUNC
684  * @tc.require:
685  * @tc.author: zhangshijie
686  */
687 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, SerializationManager001, TestSize.Level1)
688 {
689     /**
690      * @tc.steps: step1. call SingleVerSerializeManager::Serialization with buffer = nullptr or msg = nullptr
691      * @tc.expected:step1 return -E_MESSAGE_ID_ERROR
692      */
693     Message msg;
694     msg.SetMessageType(TYPE_INVALID);
695     vector<uint8_t> buffer(10); // 10 is test buffer len
696     EXPECT_EQ(SingleVerSerializeManager::Serialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
697     EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
698 
699     /**
700      * @tc.steps: step2. call SingleVerSerializeManager::Serialization with invalid type message
701      * @tc.expected:step2 return -E_MESSAGE_ID_ERROR
702      */
703     EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
704 
705     /**
706      * @tc.steps: step3. call SingleVerSerializeManager::DeSerialization with buffer = nullptr or msg = nullptr
707      * @tc.expected:step3 return -E_MESSAGE_ID_ERROR
708      */
709     EXPECT_EQ(SingleVerSerializeManager::DeSerialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
710     EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
711 
712     /**
713      * @tc.steps: step4. call SingleVerSerializeManager::DeSerialization with invalid type message
714      * @tc.expected:step4 return -E_MESSAGE_ID_ERROR
715      */
716     EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
717 }
718 
719 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryAckPacketTest001, TestSize.Level1)
720 {
721     /**
722      * @tc.steps: step1. prepare a QuerySyncAckPacket.
723      */
724     DataAckPacket packet;
725     packet.SetVersion(SOFTWARE_VERSION_CURRENT);
726     packet.SetData(INT64_MAX);
727     packet.SetRecvCode(-E_NOT_SUPPORT);
728     std::vector<uint64_t> reserved = {INT8_MAX};
729     packet.SetReserved(reserved);
730 
731     /**
732      * @tc.steps: step2. put the QuerySyncAckPacket into a message.
733      */
734     Message msg;
735     msg.SetCopiedObject(packet);
736     msg.SetMessageId(QUERY_SYNC_MESSAGE);
737     msg.SetMessageType(TYPE_RESPONSE);
738 
739     /**
740      * @tc.steps: step3. Serialization the message to a buffer.
741      */
742     int len = SingleVerSerializeManager::CalculateLen(&msg);
743     LOGE("test leng = %d", len);
744     uint8_t *buffer = new (nothrow) uint8_t[len];
745     ASSERT_TRUE(buffer != nullptr);
746     int errCode = SingleVerSerializeManager::Serialization(buffer, len, &msg);
747     ASSERT_EQ(errCode, E_OK);
748 
749     /**
750      * @tc.steps: step4. DeSerialization the buffer to a message.
751      */
752     Message outMsg;
753     outMsg.SetMessageId(QUERY_SYNC_MESSAGE);
754     outMsg.SetMessageType(TYPE_RESPONSE);
755     errCode = SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg);
756     ASSERT_EQ(errCode, E_OK);
757 
758     /**
759      * @tc.steps: step5. checkout the outMsg.
760      * @tc.expected: step5. outMsg equal the the in msg
761      */
762     auto outPacket = outMsg.GetObject<DataAckPacket>();
763     EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
764     EXPECT_EQ(outPacket->GetData(), static_cast<uint64_t>(INT64_MAX));
765     std::vector<uint64_t> reserved2 = {INT8_MAX};
766     EXPECT_EQ(outPacket->GetReserved(), reserved2);
767     EXPECT_EQ(outPacket->GetRecvCode(), -E_NOT_SUPPORT);
768     delete[] buffer;
769 }
770 
771 /**
772  * @tc.name: GetQueryWaterMark 001
773  * @tc.desc: Test metaData save and get queryWaterMark.
774  * @tc.type: FUNC
775  * @tc.require: AR000FN6G9
776  * @tc.author: zhangqiquan
777  */
778 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark001, TestSize.Level1)
779 {
780     VirtualSingleVerSyncDBInterface storage;
781     Metadata meta;
782 
783     /**
784      * @tc.steps: step1. initialize meta with storage
785      * @tc.expected: step1. E_OK
786      */
787     int errCode = meta.Initialize(&storage);
788     ASSERT_EQ(errCode, E_OK);
789 
790     /**
791      * @tc.steps: step2. save receive and send watermark
792      * @tc.expected: step2. E_OK
793      */
794     WaterMark w1 = 1;
795     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
796     EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", w1), E_OK);
797 
798     /**
799      * @tc.steps: step3. get receive and send watermark
800      * @tc.expected: step3. E_OK and get the latest value
801      */
802     WaterMark w = 0;
803     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
804     EXPECT_EQ(w1, w);
805     EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
806     EXPECT_EQ(w1, w);
807 
808     /**
809      * @tc.steps: step4. set peer and local watermark
810      * @tc.expected: step4. E_OK
811      */
812     WaterMark w2 = 2;
813     EXPECT_EQ(meta.SaveLocalWaterMark("D1", w2), E_OK);
814     EXPECT_EQ(meta.SavePeerWaterMark("D1", w2, true), E_OK);
815 
816     /**
817      * @tc.steps: step5. get receive and send watermark
818      * @tc.expected: step5. E_OK and get the w1
819      */
820     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
821     EXPECT_EQ(w2, w);
822     EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
823     EXPECT_EQ(w2, w);
824 
825     /**
826      * @tc.steps: step6. set peer and local watermark
827      * @tc.expected: step6. E_OK
828      */
829     WaterMark w3 = 3;
830     EXPECT_EQ(meta.SaveLocalWaterMark("D2", w3), E_OK);
831     EXPECT_EQ(meta.SavePeerWaterMark("D2", w3, true), E_OK);
832 
833     /**
834      * @tc.steps: step7. get receive and send watermark
835      * @tc.expected: step7. E_OK and get the w3
836      */
837     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q2", "D2", w), E_OK);
838     EXPECT_EQ(w3, w);
839     EXPECT_EQ(meta.GetSendQueryWaterMark("Q2", "D2", w), E_OK);
840     EXPECT_EQ(w3, w);
841 
842     /**
843      * @tc.steps: step8. get not exit receive and send watermark
844      * @tc.expected: step8. E_OK and get the 0
845      */
846     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q3", "D3", w), E_OK);
847     EXPECT_EQ(w, 0u);
848     EXPECT_EQ(meta.GetSendQueryWaterMark("Q3", "D3", w), E_OK);
849     EXPECT_EQ(w, 0u);
850 }
851 
852 /**
853  * @tc.name: GetQueryWaterMark 002
854  * @tc.desc: Test metaData save and get queryWaterMark after push or pull mode.
855  * @tc.type: FUNC
856  * @tc.require: AR000FN6G9
857  * @tc.author: zhangqiquan
858  */
859 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark002, TestSize.Level1)
860 {
861     VirtualSingleVerSyncDBInterface storage;
862     Metadata meta;
863 
864     /**
865      * @tc.steps: step1. initialize meta with storage
866      * @tc.expected: step1. E_OK
867      */
868     int errCode = meta.Initialize(&storage);
869     ASSERT_EQ(errCode, E_OK);
870 
871     /**
872      * @tc.steps: step2. set peer and local watermark
873      * @tc.expected: step2. E_OK
874      */
875     WaterMark w1 = 2;
876     EXPECT_EQ(meta.SaveLocalWaterMark("D1", w1), E_OK);
877     EXPECT_EQ(meta.SavePeerWaterMark("D1", w1, true), E_OK);
878 
879     /**
880      * @tc.steps: step2. save receive and send watermark
881      * @tc.expected: step2. E_OK
882      */
883     WaterMark w2 = 1;
884     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
885     EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", w2), E_OK);
886 
887     /**
888      * @tc.steps: step3. get receive and send watermark
889      * @tc.expected: step3. E_OK and get the bigger value
890      */
891     WaterMark w = 0;
892     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
893     EXPECT_EQ(w1, w);
894     EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
895     EXPECT_EQ(w1, w);
896 }
897 
898 /**
899  * @tc.name: ClearQueryWaterMark 001
900  * @tc.desc: Test metaData clear watermark function.
901  * @tc.type: FUNC
902  * @tc.require: AR000FN6G9
903  * @tc.author: zhangqiquan
904  */
905 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark001, TestSize.Level1)
906 {
907     VirtualSingleVerSyncDBInterface storage;
908     Metadata meta;
909 
910     /**
911      * @tc.steps: step1. initialize meta with storage
912      * @tc.expected: step1. E_OK
913      */
914     int errCode = meta.Initialize(&storage);
915     ASSERT_EQ(errCode, E_OK);
916 
917     /**
918      * @tc.steps: step2. save receive watermark
919      * @tc.expected: step2. E_OK
920      */
921     WaterMark w1 = 1;
922     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
923 
924     /**
925      * @tc.steps: step3. erase peer watermark
926      * @tc.expected: step3. E_OK
927      */
928     EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
929 
930     /**
931      * @tc.steps: step4. get receive watermark
932      * @tc.expected: step4. E_OK receive watermark is zero
933      */
934     WaterMark w2 = -1;
935     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
936     EXPECT_EQ(w2, 0u);
937 
938     /**
939      * @tc.steps: step5. set peer watermark
940      * @tc.expected: step5. E_OK
941      */
942     WaterMark w3 = 2;
943     EXPECT_EQ(meta.SavePeerWaterMark("D1", w3, true), E_OK);
944 
945     /**
946      * @tc.steps: step6. get receive watermark
947      * @tc.expected: step6. E_OK receive watermark is peer watermark
948      */
949     WaterMark w4 = -1;
950     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w4), E_OK);
951     EXPECT_EQ(w4, w3);
952 }
953 
954 /**
955  * @tc.name: ClearQueryWaterMark 002
956  * @tc.desc: Test metaData clear watermark function.
957  * @tc.type: FUNC
958  * @tc.require: AR000FN6G9
959  * @tc.author: zhangqiquan
960  */
961 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark002, TestSize.Level1)
962 {
963     VirtualSingleVerSyncDBInterface storage;
964     Metadata meta;
965 
966     /**
967      * @tc.steps: step1. initialize meta with storage
968      * @tc.expected: step1. E_OK
969      */
970     int errCode = meta.Initialize(&storage);
971     ASSERT_EQ(errCode, E_OK);
972 
973     /**
974      * @tc.steps: step2. save receive watermark
975      * @tc.expected: step2. E_OK
976      */
977     WaterMark w1 = 1;
978     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
979     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q2", "D1", w1), E_OK);
980     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D2", w1), E_OK);
981 
982     /**
983      * @tc.steps: step3. erase peer watermark, make sure data remove in db
984      * @tc.expected: step3. E_OK
985      */
986     Metadata anotherMeta;
987     ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
988     EXPECT_EQ(anotherMeta.EraseDeviceWaterMark("D1", true), E_OK);
989 
990     /**
991      * @tc.steps: step4. get receive watermark
992      * @tc.expected: step4. E_OK receive watermark is zero
993      */
994     WaterMark w2 = -1;
995     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
996     EXPECT_EQ(w2, 0u);
997     w2 = -1;
998     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q2", "D1", w2), E_OK);
999     EXPECT_EQ(w2, 0u);
1000     w2 = -1;
1001     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D2", w2), E_OK);
1002     EXPECT_EQ(w2, w1);
1003 }
1004 
1005 /**
1006  * @tc.name: GetQueryLastTimestamp001
1007  * @tc.desc: Test function of GetQueryLastTimestamp.
1008  * @tc.type: FUNC
1009  * @tc.require: AR000FN6G9
1010  * @tc.author: zhangshijie
1011  */
1012 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryLastTimestamp001, TestSize.Level1)
1013 {
1014     /**
1015      * @tc.steps: step1. initialize meta with nullptr
1016      * @tc.expected: step1. return -E_INVALID_DB
1017      */
1018     Metadata meta;
1019     EXPECT_EQ(meta.Initialize(nullptr), -E_INVALID_DB);
1020 
1021     /**
1022      * @tc.steps: step2. initialize meta with storage
1023      * @tc.expected: step2. E_OK
1024      */
1025     VirtualSingleVerSyncDBInterface storage;
1026     int errCode = meta.Initialize(&storage);
1027     ASSERT_EQ(errCode, E_OK);
1028 
1029     /**
1030      * @tc.steps: step3. call GetQueryLastTimestamp with a non-exists device
1031      * @tc.expected: step3. return INT64_MAX
1032      */
1033     EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), static_cast<uint64_t>(INT64_MAX));
1034 
1035     /**
1036      * @tc.steps: step4. call GetQueryLastTimestamp with device D1 again
1037      * @tc.expected: step4. return 0
1038      */
1039     EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), 0u);
1040 
1041     /**
1042      * @tc.steps: step5. call GetQueryLastTimestamp with device D1 and Q2
1043      * @tc.expected: step5. return INT64_MAX
1044      */
1045     EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q2"), static_cast<uint64_t>(INT64_MAX));
1046 }
1047 
1048 /**
1049  * @tc.name: MetaDataExceptionBranch001
1050  * @tc.desc: Test execption branch of meata data.
1051  * @tc.type: FUNC
1052  * @tc.require: AR000FN6G9
1053  * @tc.author: zhangshijie
1054  */
1055 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, MetaDataExceptionBranch001, TestSize.Level1)
1056 {
1057     /**
1058      * @tc.steps: step1. call GetRemoveDataMark with a device not in map
1059      * @tc.expected: step1. out value = 0
1060      */
1061     Metadata meta;
1062     uint64_t val = 99; // 99 is the initial value of outValue
1063     uint64_t outValue = val;
1064     meta.GetRemoveDataMark("D1", outValue);
1065     EXPECT_EQ(outValue, 0u);
1066 
1067     /**
1068      * @tc.steps: step2. reset outValue, call GetDbCreateTime with a device not in map
1069      * @tc.expected: step2. out value = 0
1070      */
1071     outValue = val;
1072     meta.GetDbCreateTime("D1", outValue);
1073     EXPECT_EQ(outValue, 0u);
1074 
1075     /**
1076      * @tc.steps: step3. call ResetMetaDataAfterRemoveData with a device not in map
1077      * @tc.expected: step3. return -E_NOT_FOUND
1078      */
1079     EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1"), -E_NOT_FOUND);
1080 }
1081 
1082 /**
1083  * @tc.name: GetDeleteKeyWaterMark 001
1084  * @tc.desc: Test metaData save and get deleteWaterMark.
1085  * @tc.type: FUNC
1086  * @tc.require: AR000FN6G9
1087  * @tc.author: zhangqiquan
1088  */
1089 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark001, TestSize.Level1)
1090 {
1091     VirtualSingleVerSyncDBInterface storage;
1092     Metadata meta;
1093 
1094     /**
1095      * @tc.steps: step1. initialize meta with storage
1096      * @tc.expected: step1. E_OK
1097      */
1098     int errCode = meta.Initialize(&storage);
1099     ASSERT_EQ(errCode, E_OK);
1100 
1101     /**
1102      * @tc.steps: step2. save receive and send watermark
1103      * @tc.expected: step2. E_OK
1104      */
1105     WaterMark w1 = 1;
1106     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w1), E_OK);
1107     EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", w1), E_OK);
1108 
1109     /**
1110      * @tc.steps: step3. get receive and send watermark
1111      * @tc.expected: step3. E_OK and get the latest value
1112      */
1113     WaterMark w = 0;
1114     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1115     EXPECT_EQ(w1, w);
1116     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1117     EXPECT_EQ(w1, w);
1118 
1119     /**
1120      * @tc.steps: step4. set peer and local watermark
1121      * @tc.expected: step4. E_OK
1122      */
1123     WaterMark w2 = 2;
1124     EXPECT_EQ(meta.SaveLocalWaterMark("D1", w2), E_OK);
1125     EXPECT_EQ(meta.SavePeerWaterMark("D1", w2, true), E_OK);
1126 
1127     /**
1128      * @tc.steps: step5. get receive and send watermark
1129      * @tc.expected: step5. E_OK and get the w1
1130      */
1131     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1132     EXPECT_EQ(w2, w);
1133     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1134     EXPECT_EQ(w2, w);
1135 
1136     /**
1137      * @tc.steps: step6. set peer and local watermark
1138      * @tc.expected: step6. E_OK
1139      */
1140     WaterMark w3 = 3;
1141     EXPECT_EQ(meta.SaveLocalWaterMark("D2", w3), E_OK);
1142     EXPECT_EQ(meta.SavePeerWaterMark("D2", w3, true), E_OK);
1143 
1144     /**
1145      * @tc.steps: step7. get receive and send watermark
1146      * @tc.expected: step7. E_OK and get the w3
1147      */
1148     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D2", w), E_OK);
1149     EXPECT_EQ(w3, w);
1150     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D2", w), E_OK);
1151     EXPECT_EQ(w3, w);
1152 
1153     /**
1154      * @tc.steps: step8. get not exit receive and send watermark
1155      * @tc.expected: step8. E_OK and get the 0
1156      */
1157     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D3", w), E_OK);
1158     EXPECT_EQ(w, 0u);
1159     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D3", w), E_OK);
1160     EXPECT_EQ(w, 0u);
1161 }
1162 
1163 /**
1164  * @tc.name: GetDeleteKeyWaterMark 002
1165  * @tc.desc: Test metaData save and get deleteWaterMark after push or pull mode.
1166  * @tc.type: FUNC
1167  * @tc.require: AR000FN6G9
1168  * @tc.author: zhangqiquan
1169  */
1170 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark002, TestSize.Level1)
1171 {
1172     VirtualSingleVerSyncDBInterface storage;
1173     Metadata meta;
1174 
1175     /**
1176      * @tc.steps: step1. initialize meta with storage
1177      * @tc.expected: step1. E_OK
1178      */
1179     int errCode = meta.Initialize(&storage);
1180     ASSERT_EQ(errCode, E_OK);
1181 
1182     /**
1183      * @tc.steps: step2. set peer and local watermark
1184      * @tc.expected: step2. E_OK
1185      */
1186     WaterMark w1 = 3;
1187     EXPECT_EQ(meta.SaveLocalWaterMark("D1", w1), E_OK);
1188     EXPECT_EQ(meta.SavePeerWaterMark("D1", w1, true), E_OK);
1189 
1190     /**
1191      * @tc.steps: step2. save receive and send watermark
1192      * @tc.expected: step2. E_OK
1193      */
1194     WaterMark w2 = 1;
1195     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w2), E_OK);
1196     EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", w2), E_OK);
1197 
1198     /**
1199      * @tc.steps: step3. get receive and send watermark
1200      * @tc.expected: step3. E_OK and get the bigger value
1201      */
1202     WaterMark w = 0;
1203     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1204     EXPECT_EQ(w1, w);
1205     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1206     EXPECT_EQ(w1, w);
1207 }
1208 
1209 /**
1210  * @tc.name: ClearDeleteKeyWaterMark 001
1211  * @tc.desc: Test metaData clear watermark function.
1212  * @tc.type: FUNC
1213  * @tc.require: AR000FN6G9
1214  * @tc.author: zhangqiquan
1215  */
1216 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearDeleteKeyWaterMark001, TestSize.Level1)
1217 {
1218     VirtualSingleVerSyncDBInterface storage;
1219     Metadata meta;
1220 
1221     /**
1222      * @tc.steps: step1. initialize meta with storage
1223      * @tc.expected: step1. E_OK
1224      */
1225     int errCode = meta.Initialize(&storage);
1226     ASSERT_EQ(errCode, E_OK);
1227 
1228     /**
1229      * @tc.steps: step2. save receive watermark
1230      * @tc.expected: step2. E_OK
1231      */
1232     WaterMark w1 = 1;
1233     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w1), E_OK);
1234 
1235     /**
1236      * @tc.steps: step3. erase peer watermark
1237      * @tc.expected: step3. E_OK
1238      */
1239     EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
1240 
1241     /**
1242      * @tc.steps: step4. get receive watermark
1243      * @tc.expected: step4. E_OK receive watermark is zero
1244      */
1245     WaterMark w2 = -1;
1246     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w2), E_OK);
1247     EXPECT_EQ(w2, 0u);
1248 
1249     /**
1250      * @tc.steps: step5. set peer watermark
1251      * @tc.expected: step5. E_OK
1252      */
1253     WaterMark w3 = 2;
1254     EXPECT_EQ(meta.SavePeerWaterMark("D1", w3, true), E_OK);
1255 
1256     /**
1257      * @tc.steps: step6. get receive watermark
1258      * @tc.expected: step6. E_OK receive watermark is peer watermark
1259      */
1260     WaterMark w4 = -1;
1261     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w4), E_OK);
1262     EXPECT_EQ(w4, w3);
1263 }
1264 
1265 /**
1266  * @tc.name: VerifyCacheAndDb 001
1267  * @tc.desc: Test metaData watermark cache and db are consistent and correct.
1268  * @tc.type: FUNC
1269  * @tc.require: AR000FN6G9
1270  * @tc.author: zhangqiquan
1271  */
1272 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataQuerySync001, TestSize.Level1)
1273 {
1274     Metadata meta;
1275     VirtualSingleVerSyncDBInterface storage;
1276 
1277     /**
1278      * @tc.steps: step1. initialize meta with storage
1279      * @tc.expected: step1. E_OK
1280      */
1281     int errCode = meta.Initialize(&storage);
1282     ASSERT_EQ(errCode, E_OK);
1283 
1284     const std::string deviceId = "D1";
1285     const std::string queryId = "Q1";
1286 
1287     /**
1288     * @tc.steps: step2. save deleteSync watermark
1289     * @tc.expected: step2. E_OK
1290     */
1291     WaterMark deleteWaterMark = 1;
1292     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark(deviceId, deleteWaterMark), E_OK);
1293     EXPECT_EQ(meta.SetSendDeleteSyncWaterMark(deviceId, deleteWaterMark), E_OK);
1294 
1295     /**
1296     * @tc.steps: step3. save querySync watermark
1297     * @tc.expected: step2. E_OK
1298     */
1299     WaterMark queryWaterMark = 2;
1300     EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, queryWaterMark), E_OK);
1301     EXPECT_EQ(meta.SetSendQueryWaterMark(queryId, deviceId, queryWaterMark), E_OK);
1302 
1303     /**
1304     * @tc.steps: step4. initialize meta with storage
1305     * @tc.expected: step4. E_OK
1306     */
1307     Metadata anotherMeta;
1308     ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1309 
1310     /**
1311     * @tc.steps: step5. verify delete sync data
1312     * @tc.expected: step5. E_OK and waterMark equal to deleteWaterMark
1313     */
1314     WaterMark waterMark;
1315     EXPECT_EQ(anotherMeta.GetRecvDeleteSyncWaterMark(deviceId, waterMark), E_OK);
1316     EXPECT_EQ(waterMark, deleteWaterMark);
1317     EXPECT_EQ(anotherMeta.GetSendDeleteSyncWaterMark(deviceId, waterMark), E_OK);
1318     EXPECT_EQ(waterMark, deleteWaterMark);
1319 
1320     /**
1321     * @tc.steps: step6. verify query sync data
1322     * @tc.expected: step6. E_OK and waterMark equal to queryWaterMark
1323     */
1324     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark(queryId, deviceId, waterMark), E_OK);
1325     EXPECT_EQ(waterMark, queryWaterMark);
1326     EXPECT_EQ(anotherMeta.GetSendQueryWaterMark(queryId, deviceId, waterMark), E_OK);
1327     EXPECT_EQ(waterMark, queryWaterMark);
1328 }
1329 
1330 /**
1331  * @tc.name: VerifyLruMap 001
1332  * @tc.desc: Test metaData watermark cache lru ability.
1333  * @tc.type: FUNC
1334  * @tc.require: AR000FN6G9
1335  * @tc.author: zhangqiquan
1336  */
1337 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyLruMap001, TestSize.Level1)
1338 {
1339     LruMap<std::string, QueryWaterMark> lruMap;
1340     const int maxCacheItems = 200;
1341 
1342     /**
1343     * @tc.steps: step1. fill items to LruMap
1344     * @tc.expected: step1. E_OK
1345     */
1346     const int startCount = 0;
1347     for (int i = startCount; i < maxCacheItems; i++) {
1348         std::string key = std::to_string(i);
1349         QueryWaterMark value;
1350         value.recvWaterMark = i + 1;
1351         EXPECT_EQ(lruMap.Put(key, value), E_OK);
1352     }
1353 
1354     /**
1355     * @tc.steps: step2. get the first item
1356     * @tc.expected: step2. E_OK first item will move to last
1357     */
1358     std::string firstItemKey = std::to_string(startCount);
1359     QueryWaterMark firstItemValue;
1360     EXPECT_EQ(lruMap.Get(firstItemKey, firstItemValue), E_OK);
1361     EXPECT_EQ(firstItemValue.recvWaterMark, 1u);
1362 
1363     /**
1364     * @tc.steps: step3. insert new items to LruMap
1365     * @tc.expected: step3. the second items was removed
1366     */
1367     std::string key = std::to_string(maxCacheItems);
1368     QueryWaterMark value;
1369     value.recvWaterMark = maxCacheItems;
1370     EXPECT_EQ(lruMap.Put(key, value), E_OK);
1371 
1372     /**
1373     * @tc.steps: step4. get the second item
1374     * @tc.expected: step4. E_NOT_FOUND it was removed by algorithm
1375     */
1376     std::string secondItemKey = std::to_string(startCount + 1);
1377     QueryWaterMark secondItemValue;
1378     EXPECT_EQ(lruMap.Get(secondItemKey, secondItemValue), -E_NOT_FOUND);
1379     EXPECT_EQ(secondItemValue.recvWaterMark, 0u);
1380 }
1381 
1382 /**
1383  * @tc.name: VerifyMetaDataInit 001
1384  * @tc.desc: Test metaData init correctly
1385  * @tc.type: FUNC
1386  * @tc.require: AR000FN6G9
1387  * @tc.author: zhangqiquan
1388  */
1389 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataInit001, TestSize.Level1)
1390 {
1391     Metadata meta;
1392     VirtualSingleVerSyncDBInterface storage;
1393 
1394     /**
1395     * @tc.steps: step1. initialize meta with storage
1396     * @tc.expected: step1. E_OK
1397     */
1398     ASSERT_EQ(meta.Initialize(&storage), E_OK);
1399 
1400     DeviceID deviceA = "DeviceA";
1401     DeviceID deviceB = "DeviceA";
1402     WaterMark setWaterMark = 1;
1403 
1404     /**
1405     * @tc.steps: step2. meta save and get waterMark
1406     * @tc.expected: step2. expect get the same waterMark
1407     */
1408     EXPECT_EQ(meta.SaveLocalWaterMark(deviceA, setWaterMark), E_OK);
1409     EXPECT_EQ(meta.SaveLocalWaterMark(deviceB, setWaterMark), E_OK);
1410     WaterMark getWaterMark = 0;
1411     meta.GetLocalWaterMark(deviceA, getWaterMark);
1412     EXPECT_EQ(getWaterMark, setWaterMark);
1413     meta.GetLocalWaterMark(deviceB, getWaterMark);
1414     EXPECT_EQ(getWaterMark, setWaterMark);
1415 
1416 
1417     /**
1418     * @tc.steps: step3. init again
1419     * @tc.expected: step3. E_OK
1420     */
1421     Metadata anotherMeta;
1422     ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1423 
1424     /**
1425     * @tc.steps: step4. get waterMark again
1426     * @tc.expected: step4. expect get the same waterMark
1427     */
1428     anotherMeta.GetLocalWaterMark(deviceA, getWaterMark);
1429     EXPECT_EQ(getWaterMark, setWaterMark);
1430     anotherMeta.GetLocalWaterMark(deviceB, getWaterMark);
1431     EXPECT_EQ(getWaterMark, setWaterMark);
1432 }
1433 
1434 namespace {
InitVerifyStorageEnvironment(Metadata & meta,VirtualSingleVerSyncDBInterface & storage,const std::string & deviceId,const int & startCount,const uint32_t & maxStoreItems)1435 void InitVerifyStorageEnvironment(Metadata &meta, VirtualSingleVerSyncDBInterface &storage,
1436     const std::string &deviceId, const int &startCount, const uint32_t &maxStoreItems)
1437 {
1438     /**
1439     * @tc.steps: step1. initialize meta with storage
1440     * @tc.expected: step1. E_OK
1441     */
1442     ASSERT_EQ(meta.Initialize(&storage), E_OK);
1443 
1444     /**
1445     * @tc.steps: step2. fill items to metadata
1446     * @tc.expected: step2. E_OK
1447     */
1448     for (uint32_t i = startCount; i < maxStoreItems; i++) {
1449         std::string queryId = std::to_string(i);
1450         WaterMark recvWaterMark = i + 1;
1451         EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, recvWaterMark), E_OK);
1452     }
1453 }
1454 }
1455 
1456 /**
1457  * @tc.name: VerifyManagerQuerySyncStorage 001
1458  * @tc.desc: Test metaData remove least used querySync storage items.
1459  * @tc.type: FUNC
1460  * @tc.require: AR000FN6G9
1461  * @tc.author: zhangqiquan
1462  */
1463 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage001, TestSize.Level3)
1464 {
1465     Metadata meta;
1466     VirtualSingleVerSyncDBInterface storage;
1467     const uint32_t maxStoreItems = 100000;
1468     const int startCount = 0;
1469     const std::string deviceId = "Device";
1470 
1471     InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1472 
1473     /**
1474     * @tc.steps: step3. insert new items to metadata
1475     * @tc.expected: step3. E_OK
1476     */
1477     std::string newQueryId = std::to_string(maxStoreItems);
1478     WaterMark newWaterMark = maxStoreItems + 1;
1479     EXPECT_EQ(meta.SetRecvQueryWaterMark(newQueryId, deviceId, newWaterMark), E_OK);
1480 
1481     /**
1482     * @tc.steps: step4. touch the first item
1483     * @tc.expected: step4. E_OK update first item used time
1484     */
1485     std::string firstItemKey = std::to_string(startCount);
1486     WaterMark firstWaterMark = 11u;
1487     EXPECT_EQ(meta.SetRecvQueryWaterMark(firstItemKey, deviceId, firstWaterMark), E_OK);
1488 
1489     /**
1490     * @tc.steps: step5. initialize new meta with storage
1491     * @tc.expected: step5. the second item will be removed
1492     */
1493     Metadata newMeta;
1494     ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1495 
1496     /**
1497     * @tc.steps: step6. touch the first item
1498     * @tc.expected: step6. E_OK it still exist
1499     */
1500     WaterMark exceptWaterMark;
1501     EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, exceptWaterMark), E_OK);
1502     EXPECT_EQ(exceptWaterMark, firstWaterMark);
1503 
1504     /**
1505     * @tc.steps: step7. get the second item
1506     * @tc.expected: step7. NOT_FOUND secondWaterMark is zero
1507     */
1508     WaterMark secondWaterMark;
1509     std::string secondQueryId = std::to_string(startCount + 1);
1510     EXPECT_EQ(newMeta.GetRecvQueryWaterMark(secondQueryId, deviceId, secondWaterMark), E_OK);
1511     EXPECT_EQ(secondWaterMark, 0u);
1512 }
1513 
1514 /**
1515  * @tc.name: VerifyMetaDbCreateTime 001
1516  * @tc.desc: Test metaData get and set cbCreateTime.
1517  * @tc.type: FUNC
1518  * @tc.require: AR000FN6G9
1519  * @tc.author: zhuwentao
1520  */
1521 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDbCreateTime001, TestSize.Level1)
1522 {
1523     Metadata meta;
1524     VirtualSingleVerSyncDBInterface storage;
1525     /**
1526      * @tc.steps: step1. initialize meta with storage
1527      * @tc.expected: step1. E_OK
1528      */
1529     int errCode = meta.Initialize(&storage);
1530     ASSERT_EQ(errCode, E_OK);
1531     /**
1532      * @tc.steps: step2. set local and peer watermark and dbCreateTime
1533      * @tc.expected: step4. E_OK
1534      */
1535     WaterMark value = 2;
1536     EXPECT_EQ(meta.SaveLocalWaterMark("D1", value), E_OK);
1537     EXPECT_EQ(meta.SavePeerWaterMark("D1", value, true), E_OK);
1538     EXPECT_EQ(meta.SetDbCreateTime("D1", 10u, true), E_OK);
1539     /**
1540      * @tc.steps: step3. check peer and local watermark and dbCreateTime
1541      * @tc.expected: step4. E_OK
1542      */
1543     WaterMark curValue = 0;
1544     meta.GetLocalWaterMark("D1", curValue);
1545     EXPECT_EQ(value, curValue);
1546     meta.GetPeerWaterMark("D1", curValue);
1547     EXPECT_EQ(value, curValue);
1548     uint64_t curDbCreatTime = 0;
1549     meta.GetDbCreateTime("D1", curDbCreatTime);
1550     EXPECT_EQ(curDbCreatTime, 10u);
1551     /**
1552      * @tc.steps: step3. change dbCreateTime and check
1553      * @tc.expected: step4. E_OK
1554      */
1555     EXPECT_EQ(meta.SetDbCreateTime("D1", 20u, true), E_OK);
1556     uint64_t clearDeviceDataMark = INT_MAX;
1557     meta.GetRemoveDataMark("D1", clearDeviceDataMark);
1558     EXPECT_EQ(clearDeviceDataMark, 1u);
1559     EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1"), E_OK);
1560     meta.GetRemoveDataMark("D1", clearDeviceDataMark);
1561     EXPECT_EQ(clearDeviceDataMark, 0u);
1562     meta.GetDbCreateTime("D1", curDbCreatTime);
1563     EXPECT_EQ(curDbCreatTime, 20u);
1564 }
1565 
1566 /**
1567  * @tc.name: VerifyManagerQuerySyncStorage 002
1568  * @tc.desc: Test metaData remove least used querySync storage items when exit wrong data.
1569  * @tc.type: FUNC
1570  * @tc.require: AR000FN6G9
1571  * @tc.author: zhangqiquan
1572  */
1573 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage002, TestSize.Level3)
1574 {
1575     Metadata meta;
1576     VirtualSingleVerSyncDBInterface storage;
1577     const uint32_t maxStoreItems = 100000;
1578     const int startCount = 0;
1579     const std::string deviceId = "Device";
1580 
1581     InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1582 
1583     /**
1584     * @tc.steps: step3. insert a wrong Value
1585     * @tc.expected: step3. E_OK
1586     */
1587     std::string newQueryId = std::to_string(maxStoreItems);
1588     Key dbKey;
1589     DBCommon::StringToVector(QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
1590         + DBCommon::TransferHashString(deviceId) + newQueryId, dbKey);
1591     Value wrongValue;
1592     EXPECT_EQ(storage.PutMetaData(dbKey, wrongValue), E_OK);
1593 
1594     /**
1595     * @tc.steps: step4. initialize new meta with storage
1596     * @tc.expected: step4. E_OK
1597     */
1598     Metadata newMeta;
1599     ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1600 
1601     /**
1602     * @tc.steps: step5. touch the first item
1603     * @tc.expected: step5. E_OK still exit
1604     */
1605     std::string firstItemKey = std::to_string(startCount);
1606     WaterMark exceptWaterMark;
1607     EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, exceptWaterMark), E_OK);
1608     EXPECT_EQ(exceptWaterMark, 1u);
1609 }
1610 
1611 /**
1612  * @tc.name: AllPredicateQuerySync001
1613  * @tc.desc: Test normal push sync for AllPredicate data.
1614  * @tc.type: FUNC
1615  * @tc.require: AR000FN6G9
1616  * @tc.author: zhuwentao
1617  */
1618 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync001, TestSize.Level1)
1619 {
1620     /**
1621      * @tc.steps: step1. InitSchemaDb
1622      */
1623     InitSchemaDb();
1624     DBStatus status = OK;
1625     std::vector<std::string> devices;
1626     devices.push_back(g_deviceB->GetDeviceId());
1627 
1628     /**
1629      * @tc.steps: step2. deviceA put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1630                          {key21, SCHEMA_VALUE2} - {key29, SCHEMA_VALUE2}
1631      */
1632     Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1633     Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1634     Key key = {'1'};
1635     Key key2 = {'2'};
1636     const int dataSize = 4000;
1637     for (int i = 0; i < dataSize; i++) {
1638         key.push_back(i);
1639         key2.push_back(i);
1640         status = g_schemaKvDelegatePtr->Put(key, value);
1641         ASSERT_TRUE(status == OK);
1642         status = g_schemaKvDelegatePtr->Put(key2, value2);
1643         ASSERT_TRUE(status == OK);
1644         key.pop_back();
1645         key2.pop_back();
1646     }
1647     ASSERT_TRUE(status == OK);
1648 
1649     /**
1650      * @tc.steps: step3. deviceA call query sync and wait
1651      * @tc.expected: step3. sync should return OK.
1652      */
1653     Query query = Query::Select().EqualTo("$.field_name1", 1);
1654     std::map<std::string, DBStatus> result;
1655     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1656     ASSERT_TRUE(status == OK);
1657 
1658     /**
1659      * @tc.expected: step4. onComplete should be called, DeviceB have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1660      */
1661     ASSERT_TRUE(result.size() == devices.size());
1662     for (const auto &pair : result) {
1663         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1664         EXPECT_TRUE(pair.second == OK);
1665     }
1666     VirtualDataItem item;
1667     VirtualDataItem item2;
1668     for (int i = 0; i < dataSize; i++) {
1669         key.push_back(i);
1670         key2.push_back(i);
1671         g_deviceB->GetData(key, item);
1672         EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1673         EXPECT_TRUE(item.value == value);
1674         key.pop_back();
1675         key2.pop_back();
1676     }
1677 }
1678 
1679 /**
1680  * @tc.name: AllPredicateQuerySync002
1681  * @tc.desc: Test wrong query param push sync for AllPredicate data.
1682  * @tc.type: FUNC
1683  * @tc.require: AR000FN6G9
1684  * @tc.author: zhuwentao
1685  */
1686 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync002, TestSize.Level1)
1687 {
1688     /**
1689      * @tc.steps: step1. InitSchemaDb
1690      */
1691     InitSchemaDb();
1692     DBStatus status = OK;
1693     std::vector<std::string> devices;
1694     devices.push_back(g_deviceB->GetDeviceId());
1695 
1696     /**
1697      * @tc.steps: step2. deviceA call query sync and wait
1698      * @tc.expected: step2. sync should return INVALID_QUERY_FIELD
1699      */
1700     Query query = Query::Select().GreaterThan("field_name11", 10);
1701     std::map<std::string, DBStatus> result;
1702     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1703     ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1704     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
1705     ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1706     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1707     ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1708 }
1709 
1710 /**
1711  * @tc.name: AllPredicateQuerySync003
1712  * @tc.desc: Test normal push sync for AllPredicate data with limit
1713  * @tc.type: FUNC
1714  * @tc.require: AR000FN6G9
1715  * @tc.author: zhuwentao
1716  */
1717 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync003, TestSize.Level1)
1718 {
1719     /**
1720      * @tc.steps: step1. InitSchemaDb
1721      */
1722     InitSchemaDb();
1723     DBStatus status = OK;
1724     std::vector<std::string> devices;
1725     devices.push_back(g_deviceB->GetDeviceId());
1726 
1727     /**
1728      * @tc.steps: step2. deviceA put {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1729      */
1730     Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1731     Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1732     Key key = {'1'};
1733     Key key2 = {'2'};
1734     const int dataSize = 10;
1735     for (int i = 0; i < dataSize; i++) {
1736         key.push_back(i);
1737         key2.push_back(i);
1738         status = g_schemaKvDelegatePtr->Put(key, value);
1739         ASSERT_TRUE(status == OK);
1740         status = g_schemaKvDelegatePtr->Put(key2, value2);
1741         ASSERT_TRUE(status == OK);
1742         key.pop_back();
1743         key2.pop_back();
1744     }
1745     ASSERT_TRUE(status == OK);
1746 
1747     /**
1748      * @tc.steps: step3. deviceA call query sync with limit and wait
1749      * @tc.expected: step3. sync should return OK.
1750      */
1751     Query query = Query::Select().EqualTo("$.field_name1", 1).Limit(20, 0);
1752     std::map<std::string, DBStatus> result;
1753     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1754     ASSERT_TRUE(status == OK);
1755 
1756     /**
1757      * @tc.expected: step4. onComplete should be called, DeviceB have {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1758      */
1759     ASSERT_TRUE(result.size() == devices.size());
1760     for (const auto &pair : result) {
1761         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1762         EXPECT_TRUE(pair.second == OK);
1763     }
1764     VirtualDataItem item;
1765     VirtualDataItem item2;
1766     for (int i = 0; i < dataSize; i++) {
1767         key.push_back(i);
1768         key2.push_back(i);
1769         g_deviceB->GetData(key, item);
1770         EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1771         EXPECT_TRUE(item.value == value);
1772         key.pop_back();
1773         key2.pop_back();
1774     }
1775 }
1776 
1777 /**
1778  * @tc.name: AllPredicateQuerySync004
1779  * @tc.desc: Test normal pull sync for AllPredicate data.
1780  * @tc.type: FUNC
1781  * @tc.require: AR000FN6G9
1782  * @tc.author: zhuwentao
1783  */
1784 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync004, TestSize.Level1)
1785 {
1786     /**
1787      * @tc.steps: step1. InitSchemaDb
1788      */
1789     InitSchemaDb();
1790     DBStatus status = OK;
1791     std::vector<std::string> devices;
1792     devices.push_back(g_deviceB->GetDeviceId());
1793 
1794     /**
1795      * @tc.steps: step2. deviceB put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1796      */
1797     Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1798     Key key = {'1'};
1799     const int dataSize = 10;
1800     for (int i = 0; i < dataSize; i++) {
1801         key.push_back(i);
1802         g_deviceB->PutData(key, value, 10 + i, 0);
1803         ASSERT_TRUE(status == OK);
1804         key.pop_back();
1805     }
1806     ASSERT_TRUE(status == OK);
1807 
1808     /**
1809      * @tc.steps: step3. deviceA call query sync and wait
1810      * @tc.expected: step3. sync should return OK.
1811      */
1812     Query query = Query::Select().EqualTo("$.field_name1", 1);
1813     std::map<std::string, DBStatus> result;
1814     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1815     ASSERT_TRUE(status == OK);
1816 
1817     /**
1818      * @tc.expected: step4. onComplete should be called, DeviceA have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1819      */
1820     ASSERT_TRUE(result.size() == devices.size());
1821     for (const auto &pair : result) {
1822         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1823         EXPECT_TRUE(pair.second == OK);
1824     }
1825     Value item;
1826     Value item2;
1827     for (int i = 0; i < dataSize; i++) {
1828         key.push_back(i);
1829         g_schemaKvDelegatePtr->Get(key, item);
1830         EXPECT_TRUE(item == value);
1831         key.pop_back();
1832     }
1833 }