• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 #include <thread>
18 
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "generic_single_ver_kv_entry.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "platform_specific.h"
27 #include "query.h"
28 #include "query_sync_object.h"
29 #include "single_ver_data_sync.h"
30 #include "single_ver_serialize_manager.h"
31 #include "sync_types.h"
32 #include "virtual_communicator.h"
33 #include "virtual_communicator_aggregator.h"
34 #include "virtual_single_ver_sync_db_Interface.h"
35 
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40 
41 namespace {
42     string g_testDir;
43     const string STORE_ID = "kv_store_sync_test";
44     const string SCHEMA_STORE_ID = "kv_store_sync_schema_test";
45     const std::string DEVICE_B = "deviceB";
46 
47     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
48     KvStoreDelegateManager g_schemaMgr(SCHEMA_APP_ID, USER_ID);
49     KvStoreConfig g_config;
50     DistributedDBToolsUnitTest g_tool;
51     DBStatus g_kvDelegateStatus = INVALID_ARGS;
52     DBStatus g_schemaKvDelegateStatus = INVALID_ARGS;
53     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
54     KvStoreNbDelegate* g_schemaKvDelegatePtr = nullptr;
55     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
56     KvVirtualDevice *g_deviceB = nullptr;
57 
58     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
59     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
60         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
61     auto g_schemaKvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
62         placeholders::_1, placeholders::_2, std::ref(g_schemaKvDelegateStatus), std::ref(g_schemaKvDelegatePtr));
63     const string SCHEMA_STRING =
64     "{\"SCHEMA_VERSION\":\"1.0\","
65     "\"SCHEMA_MODE\":\"STRICT\","
66     "\"SCHEMA_DEFINE\":{"
67     "\"field_name1\":\"BOOL\","
68     "\"field_name2\":\"BOOL\","
69     "\"field_name3\":\"INTEGER, NOT NULL\","
70     "\"field_name4\":\"LONG, DEFAULT 100\","
71     "\"field_name5\":\"DOUBLE, NOT NULL, DEFAULT 3.14\","
72     "\"field_name6\":\"STRING, NOT NULL, DEFAULT '3.1415'\","
73     "\"field_name7\":\"LONG, DEFAULT 100\","
74     "\"field_name8\":\"LONG, DEFAULT 100\","
75     "\"field_name9\":\"LONG, DEFAULT 100\","
76     "\"field_name10\":\"LONG, DEFAULT 100\""
77     "},"
78     "\"SCHEMA_INDEXES\":[\"$.field_name1\", \"$.field_name2\"]}";
79 
80     const std::string SCHEMA_VALUE1 =
81     "{\"field_name1\":true,"
82     "\"field_name2\":false,"
83     "\"field_name3\":10,"
84     "\"field_name4\":20,"
85     "\"field_name5\":3.14,"
86     "\"field_name6\":\"3.1415\","
87     "\"field_name7\":100,"
88     "\"field_name8\":100,"
89     "\"field_name9\":100,"
90     "\"field_name10\":100}";
91 
92     const std::string SCHEMA_VALUE2 =
93     "{\"field_name1\":false,"
94     "\"field_name2\":true,"
95     "\"field_name3\":100,"
96     "\"field_name4\":200,"
97     "\"field_name5\":3.14,"
98     "\"field_name6\":\"3.1415\","
99     "\"field_name7\":100,"
100     "\"field_name8\":100,"
101     "\"field_name9\":100,"
102     "\"field_name10\":100}";
103 }
104 
105 class DistributedDBSingleVerP2PQuerySyncTest : public testing::Test {
106 public:
107     static void SetUpTestCase(void);
108     static void TearDownTestCase(void);
109     void SetUp();
110     void TearDown();
111 };
112 
SetUpTestCase(void)113 void DistributedDBSingleVerP2PQuerySyncTest::SetUpTestCase(void)
114 {
115     /**
116      * @tc.setup: Init datadir and Virtual Communicator.
117      */
118     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
119     string dir = g_testDir + "/single_ver";
120     DIR* dirTmp = opendir(dir.c_str());
121     if (dirTmp == nullptr) {
122         OS::MakeDBDirectory(dir);
123     } else {
124         closedir(dirTmp);
125     }
126 
127     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
128     ASSERT_TRUE(g_communicatorAggregator != nullptr);
129     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
130 }
131 
TearDownTestCase(void)132 void DistributedDBSingleVerP2PQuerySyncTest::TearDownTestCase(void)
133 {
134     /**
135      * @tc.teardown: Release virtual Communicator and clear data dir.
136      */
137     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
138         LOGE("rm test db files error!");
139     }
140     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
141 }
142 
SetUp(void)143 void DistributedDBSingleVerP2PQuerySyncTest::SetUp(void)
144 {
145     DistributedDBToolsUnitTest::PrintTestCaseInfo();
146     /**
147      * @tc.setup: create virtual device B and get a KvStoreNbDelegate as deviceA
148      */
149     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
150     ASSERT_TRUE(g_deviceB != nullptr);
151     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
152     ASSERT_TRUE(syncInterfaceB != nullptr);
153     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
154 }
155 
TearDown(void)156 void DistributedDBSingleVerP2PQuerySyncTest::TearDown(void)
157 {
158     /**
159      * @tc.teardown: Release device A, B
160      */
161     if (g_kvDelegatePtr != nullptr) {
162         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
163         g_kvDelegatePtr = nullptr;
164         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
165         LOGD("delete kv store status %d", status);
166         ASSERT_TRUE(status == OK);
167     }
168     if (g_schemaKvDelegatePtr != nullptr) {
169         ASSERT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
170         g_schemaKvDelegatePtr = nullptr;
171         DBStatus status = g_schemaMgr.DeleteKvStore(SCHEMA_STORE_ID);
172         LOGD("delete kv store status %d", status);
173         ASSERT_TRUE(status == OK);
174     }
175     if (g_deviceB != nullptr) {
176         delete g_deviceB;
177         g_deviceB = nullptr;
178     }
179     PermissionCheckCallbackV2 nullCallback;
180     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
181 }
182 
InitNormalDb()183 void InitNormalDb()
184 {
185     g_config.dataDir = g_testDir;
186     g_mgr.SetKvStoreConfig(g_config);
187     KvStoreNbDelegate::Option option;
188     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
189     ASSERT_TRUE(g_kvDelegateStatus == OK);
190     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
191 }
192 
InitSchemaDb()193 void InitSchemaDb()
194 {
195     g_config.dataDir = g_testDir;
196     g_schemaMgr.SetKvStoreConfig(g_config);
197     KvStoreNbDelegate::Option option;
198     option.schema = SCHEMA_STRING;
199     g_schemaMgr.GetKvStore(SCHEMA_STORE_ID, option, g_schemaKvDelegateCallback);
200     ASSERT_TRUE(g_schemaKvDelegateStatus == OK);
201     ASSERT_TRUE(g_schemaKvDelegatePtr != nullptr);
202 }
203 
204 /**
205  * @tc.name: Normal Sync 001
206  * @tc.desc: Test normal push sync for keyprefix data.
207  * @tc.type: FUNC
208  * @tc.require: AR000FN6G9
209  * @tc.author: xushaohua
210  */
211 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync001, TestSize.Level1)
212 {
213     InitNormalDb();
214     DBStatus status = OK;
215     std::vector<std::string> devices;
216     devices.push_back(g_deviceB->GetDeviceId());
217 
218     /**
219      * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
220      */
221     Key key = {'1'};
222     Value value = {'1'};
223     const int dataSize = 10;
224     for (int i = 0; i < dataSize; i++) {
225         key.push_back(i);
226         value.push_back(i);
227         status = g_kvDelegatePtr->Put(key, value);
228         ASSERT_TRUE(status == OK);
229         key.pop_back();
230         value.pop_back();
231     }
232     Key key2 = {'2'};
233     Value value2 = {'2'};
234     status = g_kvDelegatePtr->Put(key2, value2);
235     ASSERT_TRUE(status == OK);
236 
237     /**
238      * @tc.steps: step2. deviceA call query sync and wait
239      * @tc.expected: step2. sync should return OK.
240      */
241     Query query = Query::Select().PrefixKey(key);
242     std::map<std::string, DBStatus> result;
243     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
244     ASSERT_TRUE(status == OK);
245 
246     /**
247      * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
248      */
249     ASSERT_TRUE(result.size() == devices.size());
250     for (const auto &pair : result) {
251         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
252         EXPECT_TRUE(pair.second == OK);
253     }
254     VirtualDataItem item;
255     for (int i = 0; i < dataSize; i++) {
256         key.push_back(i);
257         value.push_back(i);
258         g_deviceB->GetData(key, item);
259         EXPECT_TRUE(item.value == value);
260         key.pop_back();
261         value.pop_back();
262     }
263     EXPECT_TRUE(g_deviceB->GetData(key2, item) != E_OK);
264 }
265 
266 /**
267  * @tc.name: Normal Sync 002
268  * @tc.desc: Test normal push sync for limit and offset.
269  * @tc.type: FUNC
270  * @tc.require: AR000FN6G9
271  * @tc.author: xushaohua
272  */
273 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync002, TestSize.Level1)
274 {
275     InitNormalDb();
276     DBStatus status = OK;
277     std::vector<std::string> devices;
278     devices.push_back(g_deviceB->GetDeviceId());
279 
280     /**
281      * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
282      */
283     Key key = {'1'};
284     Value value = {'1'};
285     const int dataSize = 10;
286     for (int i = 0; i < dataSize; i++) {
287         key.push_back(i);
288         value.push_back(i);
289         status = g_kvDelegatePtr->Put(key, value);
290         ASSERT_TRUE(status == OK);
291         key.pop_back();
292         value.pop_back();
293     }
294 
295     /**
296      * @tc.steps: step2. deviceA call sync and wait
297      * @tc.expected: step2. sync should return OK.
298      */
299     const int limit = 5;
300     const int offset = 4;
301     Query query = Query::Select().PrefixKey(key).Limit(limit, offset);
302     std::map<std::string, DBStatus> result;
303     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
304     ASSERT_TRUE(status == OK);
305 
306     /**
307      * @tc.expected: step3. onComplete should be called, DeviceB have {k4,v4} {k8, v8}
308      */
309     ASSERT_TRUE(result.size() == devices.size());
310     for (const auto &pair : result) {
311         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
312         EXPECT_TRUE(pair.second == OK);
313     }
314 
315     VirtualDataItem item;
316     for (int i = limit - 1; i < limit + offset; i++) {
317         key.push_back(i);
318         value.push_back(i);
319         g_deviceB->GetData(key, item);
320         EXPECT_TRUE(item.value == value);
321         key.pop_back();
322         value.pop_back();
323     }
324 }
325 
326 /**
327  * @tc.name: Normal Sync 001
328  * @tc.desc: Test normal push_and_pull sync for keyprefix data.
329  * @tc.type: FUNC
330  * @tc.require: AR000FN6G9
331  * @tc.author: zhuwentao
332  */
333 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync003, TestSize.Level1)
334 {
335     InitNormalDb();
336     DBStatus status = OK;
337     std::vector<std::string> devices;
338     devices.push_back(g_deviceB->GetDeviceId());
339 
340     /**
341      * @tc.steps: step1. deviceA put {k, v}, {b, v}
342      */
343     Key key = {'1'};
344     Value value = {'1'};
345     const int dataSize = 10;
346     status = g_kvDelegatePtr->Put(key, value);
347     ASSERT_TRUE(status == OK);
348     Key key2 = {'2'};
349     Value value2 = {'2'};
350     status = g_kvDelegatePtr->Put(key2, value2);
351     ASSERT_TRUE(status == OK);
352 
353     /**
354      * @tc.steps: step2. deviceB put {b0, v0} - {b9, v9}, {c, v}
355      */
356     for (int i = 0; i < dataSize; i++) {
357         key2.push_back(i);
358         value2.push_back(i);
359         g_deviceB->PutData(key2, value2, 10 + i, 0);
360         key2.pop_back();
361         value2.pop_back();
362     }
363     Key key3 = {'3'};
364     Value value3 = {'3'};
365     g_deviceB->PutData(key3, value3, 20, 0);
366 
367     /**
368      * @tc.steps: step2. deviceA call query sync and wait
369      * @tc.expected: step2. sync should return OK.
370      */
371     Query query = Query::Select().PrefixKey(key2);
372     std::map<std::string, DBStatus> result;
373     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
374     ASSERT_TRUE(status == OK);
375 
376     /**
377      * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}, DeviceB have {b, v}
378      */
379     ASSERT_TRUE(result.size() == devices.size());
380     for (const auto &pair : result) {
381         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
382         EXPECT_TRUE(pair.second == OK);
383     }
384     VirtualDataItem item;
385     Value tmpValue;
386     for (int i = 0; i < dataSize; i++) {
387         key2.push_back(i);
388         value2.push_back(i);
389         g_kvDelegatePtr->Get(key2, tmpValue);
390         EXPECT_TRUE(tmpValue == value2);
391         key2.pop_back();
392         value2.pop_back();
393     }
394     EXPECT_TRUE(g_deviceB->GetData(key, item) != E_OK);
395     EXPECT_TRUE(g_deviceB->GetData(key2, item) == E_OK);
396     g_kvDelegatePtr->Get(key3, tmpValue);
397     EXPECT_TRUE(tmpValue != value3);
398 }
399 
400 /**
401  * @tc.name: Normal Sync 001
402  * @tc.desc: Test normal pull sync for keyprefix data.
403  * @tc.type: FUNC
404  * @tc.require: AR000FN6G9
405  * @tc.author: zhuwentao
406  */
407 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync004, TestSize.Level1)
408 {
409     InitNormalDb();
410     DBStatus status = OK;
411     std::vector<std::string> devices;
412     devices.push_back(g_deviceB->GetDeviceId());
413     /**
414      * @tc.steps: step1. deviceB put {k1, v1} - {k9, k9}, {b0, v0} - {b9, v9}
415      */
416     Key key = {'1'};
417     Value value = {'1'};
418     const int dataSize = 10;
419     Key key2 = {'2'};
420     Value value2 = {'2'};
421     vector<std::pair<Key, Value>> key1Vec;
422     vector<std::pair<Key, Value>> key2Vec;
423     for (int i = 0; i < dataSize; i++) {
424         Key tmpKey(key);
425         Value tmpValue(value);
426         tmpKey.push_back(i);
427         tmpValue.push_back(i);
428         key1Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
429     }
430     for (int i = 0; i < dataSize; i++) {
431         Key tmpKey(key2);
432         Value tmpValue(value2);
433         tmpKey.push_back(i);
434         tmpValue.push_back(i);
435         key2Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
436     }
437     for (int i = 0; i < dataSize; i++) {
438         g_deviceB->PutData(key2Vec[i].first, key2Vec[i].second, 20 + i, 0);
439         g_deviceB->PutData(key1Vec[i].first, key1Vec[i].second, 10 + i, 0);
440     }
441 
442     /**
443      * @tc.steps: step2. deviceA call query sync and wait
444      * @tc.expected: step2. sync should return OK.
445      */
446     Query query = Query::Select().PrefixKey(key2);
447     std::map<std::string, DBStatus> result;
448     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
449     ASSERT_TRUE(status == OK);
450 
451     /**
452      * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}
453      */
454     ASSERT_TRUE(result.size() == devices.size());
455     for (const auto &pair : result) {
456         EXPECT_TRUE(pair.second == OK);
457     }
458     VirtualDataItem item;
459     Value tmpValue;
460     for (int i = 0; i < dataSize; i++) {
461         g_kvDelegatePtr->Get(key2Vec[i].first, tmpValue);
462         EXPECT_TRUE(tmpValue == key2Vec[i].second);
463         g_kvDelegatePtr->Get(key1Vec[i].first, tmpValue);
464         EXPECT_TRUE(tmpValue != key1Vec[i].second);
465     }
466 }
467 
468 /**
469  * @tc.name: NormalSync005
470  * @tc.desc: Test normal push sync for inkeys query.
471  * @tc.type: FUNC
472  * @tc.require: AR000GOHO7
473  * @tc.author: lidongwei
474  */
475 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync005, TestSize.Level1)
476 {
477     InitNormalDb();
478     std::vector<std::string> devices;
479     devices.push_back(g_deviceB->GetDeviceId());
480 
481     /**
482      * @tc.steps: step1. deviceA put K1-K5
483      */
484     ASSERT_EQ(g_kvDelegatePtr->PutBatch(
485         {{KEY_1, VALUE_1}, {KEY_2, VALUE_2}, {KEY_3, VALUE_3}, {KEY_4, VALUE_4}, {KEY_5, VALUE_5}}), OK);
486 
487     /**
488      * @tc.steps: step2. deviceA sync K2,K4 and wait
489      * @tc.expected: step2. sync should return OK.
490      */
491     Query query = Query::Select().InKeys({KEY_2, KEY_4});
492     std::map<std::string, DBStatus> result;
493     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OK);
494 
495     /**
496      * @tc.expected: step3. onComplete should be called.
497      */
498     ASSERT_EQ(result.size(), devices.size());
499     for (const auto &pair : result) {
500         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
501         EXPECT_EQ(pair.second, OK);
502     }
503 
504     /**
505      * @tc.steps: step4. deviceB have K2K4 and have no K1K3K5.
506      * @tc.expected: step4. sync should return OK.
507      */
508     VirtualDataItem item;
509     EXPECT_EQ(g_deviceB->GetData(KEY_2, item), E_OK);
510     EXPECT_EQ(item.value, VALUE_2);
511     EXPECT_EQ(g_deviceB->GetData(KEY_4, item), E_OK);
512     EXPECT_EQ(item.value, VALUE_4);
513     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
514     EXPECT_EQ(g_deviceB->GetData(KEY_3, item), -E_NOT_FOUND);
515     EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
516 
517     /**
518      * @tc.steps: step5. deviceA sync with invalid inkeys query
519      * @tc.expected: step5. sync failed and the rc is right.
520      */
521     query = Query::Select().InKeys({});
522     result.clear();
523     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
524 
525     std::set<Key> keys;
526     for (uint8_t i = 0; i < DBConstant::MAX_BATCH_SIZE + 1; i++) {
527         Key key = { i };
528         keys.emplace(key);
529     }
530     query = Query::Select().InKeys(keys);
531     result.clear();
532     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OVER_MAX_LIMITS);
533 
534     query = Query::Select().InKeys({{}});
535     result.clear();
536     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
537 }
538 
539 /**
540  * @tc.name: NormalSync006
541  * @tc.desc: Test normal push sync with query by 32 devices;
542  * @tc.type: FUNC
543  * @tc.require:
544  * @tc.author: zhuwentao
545  */
546 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync006, TestSize.Level1)
547 {
548     /**
549      * @tc.steps: step1. init db and 32 devices
550      */
551     InitNormalDb();
552     uint32_t syncDevCount = 32u;
553     std::vector<KvVirtualDevice *> virtualDeviceVec(syncDevCount, nullptr);
554     const std::string device = "deviceTmp_";
555     std::vector<std::string> devices;
556     bool isError = false;
557     for (uint32_t i = 0; i < syncDevCount; i++) {
558         std::string tmpDev = device + std::to_string(i);
559         virtualDeviceVec[i] = new (std::nothrow) KvVirtualDevice(tmpDev);
560         if (virtualDeviceVec[i] == nullptr) {
561             isError = true;
562             break;
563         }
564         VirtualSingleVerSyncDBInterface *tmpSyncInterface = new (std::nothrow) VirtualSingleVerSyncDBInterface();
565         if (tmpSyncInterface == nullptr) {
566             isError = true;
567             break;
568         }
569         ASSERT_EQ(virtualDeviceVec[i]->Initialize(g_communicatorAggregator, tmpSyncInterface), E_OK);
570         devices.push_back(virtualDeviceVec[i]->GetDeviceId());
571     }
572     if (isError) {
573         for (uint32_t i = 0; i < syncDevCount; i++) {
574             if (virtualDeviceVec[i] != nullptr) {
575                 delete virtualDeviceVec[i];
576                 virtualDeviceVec[i] = nullptr;
577             }
578         }
579         ASSERT_TRUE(false);
580     }
581     /**
582      * @tc.steps: step2. deviceA put {k0, v0}
583      */
584     Key key = {'1'};
585     Value value = {'1'};
586     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
587     /**
588      * @tc.steps: step3. deviceA call query sync and wait
589      * @tc.expected: step3. sync should return OK.
590      */
591     Query query = Query::Select().PrefixKey(key);
592     std::map<std::string, DBStatus> result;
593     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
594 
595     /**
596      * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
597      */
598     ASSERT_TRUE(result.size() == devices.size());
599     for (const auto &pair : result) {
600         EXPECT_TRUE(pair.second == OK);
601     }
602     VirtualDataItem item;
603     for (uint32_t i = 0; i < syncDevCount; i++) {
604         EXPECT_TRUE(virtualDeviceVec[i]->GetData(key, item) == E_OK);
605         EXPECT_EQ(item.value, value);
606         delete virtualDeviceVec[i];
607         virtualDeviceVec[i] = nullptr;
608     }
609 }
610 
611 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryRequestPacketTest001, TestSize.Level1)
612 {
613     /**
614      * @tc.steps: step1. prepare a QuerySyncRequestPacket.
615      */
616     auto packet = new (std::nothrow) DataRequestPacket;
617     ASSERT_TRUE(packet != nullptr);
618     auto kvEntry = new (std::nothrow) GenericSingleVerKvEntry;
619     ASSERT_TRUE(kvEntry != nullptr);
620     kvEntry->SetTimestamp(1);
621     SyncEntry syncData {.entries = {kvEntry}};
622 #ifndef OMIT_ZLIB
623     ASSERT_TRUE(GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
624         {CompressAlgorithm::ZLIB, SOFTWARE_VERSION_CURRENT}) == E_OK);
625     packet->SetCompressAlgo(CompressAlgorithm::ZLIB);
626     packet->SetFlag(4); // set IS_COMPRESS_DATA flag true
627 #endif
628     packet->SetBasicInfo(-E_NOT_SUPPORT, SOFTWARE_VERSION_CURRENT, SyncModeType::QUERY_PUSH_PULL);
629     packet->SetData(syncData.entries);
630     packet->SetCompressData(syncData.compressedEntries);
631     packet->SetEndWaterMark(INT8_MAX);
632     packet->SetWaterMark(INT16_MAX, INT32_MAX, INT64_MAX);
633     QuerySyncObject syncQuery(Query::Select().PrefixKey({'2'}));
634     packet->SetQuery(syncQuery);
635     packet->SetQueryId(syncQuery.GetIdentify());
636     packet->SetReserved(std::vector<uint64_t> {INT8_MAX});
637 
638     /**
639      * @tc.steps: step2. put the QuerySyncRequestPacket into a message.
640      */
641     Message msg;
642     msg.SetExternalObject(packet);
643     msg.SetMessageId(QUERY_SYNC_MESSAGE);
644     msg.SetMessageType(TYPE_REQUEST);
645 
646     /**
647      * @tc.steps: step3. Serialization the message to a buffer.
648      */
649     int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
650     vector<uint8_t> buffer(len);
651     ASSERT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), E_OK);
652 
653     /**
654      * @tc.steps: step4. DeSerialization the buffer to a message.
655      */
656     Message outMsg(QUERY_SYNC_MESSAGE);
657     outMsg.SetMessageType(TYPE_REQUEST);
658     ASSERT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &outMsg), E_OK);
659 
660     /**
661      * @tc.steps: step5. checkout the outMsg.
662      * @tc.expected: step5. outMsg equal the the in msg
663      */
664     auto outPacket = outMsg.GetObject<DataRequestPacket>();
665     EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
666     EXPECT_EQ(outPacket->GetMode(), SyncModeType::QUERY_PUSH_PULL);
667     EXPECT_EQ(outPacket->GetEndWaterMark(), static_cast<uint64_t>(INT8_MAX));
668     EXPECT_EQ(outPacket->GetLocalWaterMark(), static_cast<uint64_t>(INT16_MAX));
669     EXPECT_EQ(outPacket->GetPeerWaterMark(), static_cast<uint64_t>(INT32_MAX));
670     EXPECT_EQ(outPacket->GetDeletedWaterMark(), static_cast<uint64_t>(INT64_MAX));
671 #ifndef OMIT_ZLIB
672     EXPECT_EQ(outPacket->GetFlag(), static_cast<uint32_t>(4)); // check IS_COMPRESS_DATA flag true
673 #endif
674     EXPECT_EQ(outPacket->GetQueryId(), syncQuery.GetIdentify());
675     EXPECT_EQ(outPacket->GetReserved(), std::vector<uint64_t> {INT8_MAX});
676     EXPECT_EQ(outPacket->GetSendCode(), -E_NOT_SUPPORT);
677     EXPECT_EQ(outPacket->GetData()[0]->GetTimestamp(), 1u);
678 }
679 
680 /**
681  * @tc.name: QueryRequestPacketTest002
682  * @tc.desc: Test exception branch of serialization.
683  * @tc.type: FUNC
684  * @tc.require:
685  * @tc.author: zhangshijie
686  */
687 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, SerializationManager001, TestSize.Level1)
688 {
689     /**
690      * @tc.steps: step1. call SingleVerSerializeManager::Serialization with buffer = nullptr or msg = nullptr
691      * @tc.expected:step1 return -E_MESSAGE_ID_ERROR
692      */
693     Message msg;
694     msg.SetMessageType(TYPE_INVALID);
695     vector<uint8_t> buffer(10); // 10 is test buffer len
696     EXPECT_EQ(SingleVerSerializeManager::Serialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
697     EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
698 
699     /**
700      * @tc.steps: step2. call SingleVerSerializeManager::Serialization with invalid type message
701      * @tc.expected:step2 return -E_MESSAGE_ID_ERROR
702      */
703     EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
704 
705     /**
706      * @tc.steps: step3. call SingleVerSerializeManager::DeSerialization with buffer = nullptr or msg = nullptr
707      * @tc.expected:step3 return -E_MESSAGE_ID_ERROR
708      */
709     EXPECT_EQ(SingleVerSerializeManager::DeSerialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
710     EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
711 
712     /**
713      * @tc.steps: step4. call SingleVerSerializeManager::DeSerialization with invalid type message
714      * @tc.expected:step4 return -E_MESSAGE_ID_ERROR
715      */
716     EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
717 }
718 
719 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryAckPacketTest001, TestSize.Level1)
720 {
721     /**
722      * @tc.steps: step1. prepare a QuerySyncAckPacket.
723      */
724     DataAckPacket packet;
725     packet.SetVersion(SOFTWARE_VERSION_CURRENT);
726     packet.SetData(INT64_MAX);
727     packet.SetRecvCode(-E_NOT_SUPPORT);
728     std::vector<uint64_t> reserved = {INT8_MAX};
729     packet.SetReserved(reserved);
730 
731     /**
732      * @tc.steps: step2. put the QuerySyncAckPacket into a message.
733      */
734     Message msg;
735     msg.SetCopiedObject(packet);
736     msg.SetMessageId(QUERY_SYNC_MESSAGE);
737     msg.SetMessageType(TYPE_RESPONSE);
738 
739     /**
740      * @tc.steps: step3. Serialization the message to a buffer.
741      */
742     int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
743     LOGE("test leng = %d", len);
744     uint8_t *buffer = new (nothrow) uint8_t[len];
745     ASSERT_TRUE(buffer != nullptr);
746     int errCode = SingleVerSerializeManager::Serialization(buffer, len, &msg);
747     ASSERT_EQ(errCode, E_OK);
748 
749     /**
750      * @tc.steps: step4. DeSerialization the buffer to a message.
751      */
752     Message outMsg;
753     outMsg.SetMessageId(QUERY_SYNC_MESSAGE);
754     outMsg.SetMessageType(TYPE_RESPONSE);
755     errCode = SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg);
756     ASSERT_EQ(errCode, E_OK);
757 
758     /**
759      * @tc.steps: step5. checkout the outMsg.
760      * @tc.expected: step5. outMsg equal the the in msg
761      */
762     auto outPacket = outMsg.GetObject<DataAckPacket>();
763     EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
764     EXPECT_EQ(outPacket->GetData(), static_cast<uint64_t>(INT64_MAX));
765     std::vector<uint64_t> reserved2 = {INT8_MAX};
766     EXPECT_EQ(outPacket->GetReserved(), reserved2);
767     EXPECT_EQ(outPacket->GetRecvCode(), -E_NOT_SUPPORT);
768     delete[] buffer;
769 }
770 
771 /**
772  * @tc.name: GetQueryWaterMark 001
773  * @tc.desc: Test metaData save and get queryWaterMark.
774  * @tc.type: FUNC
775  * @tc.require: AR000FN6G9
776  * @tc.author: zhangqiquan
777  */
778 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark001, TestSize.Level1)
779 {
780     VirtualSingleVerSyncDBInterface storage;
781     Metadata meta;
782 
783     /**
784      * @tc.steps: step1. initialize meta with storage
785      * @tc.expected: step1. E_OK
786      */
787     int errCode = meta.Initialize(&storage);
788     ASSERT_EQ(errCode, E_OK);
789 
790     /**
791      * @tc.steps: step2. save receive and send watermark
792      * @tc.expected: step2. E_OK
793      */
794     WaterMark w1 = 1;
795     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
796     EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", w1), E_OK);
797 
798     /**
799      * @tc.steps: step3. get receive and send watermark
800      * @tc.expected: step3. E_OK and get the latest value
801      */
802     WaterMark w = 0;
803     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
804     EXPECT_EQ(w1, w);
805     EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
806     EXPECT_EQ(w1, w);
807 
808     /**
809      * @tc.steps: step4. set peer and local watermark
810      * @tc.expected: step4. E_OK
811      */
812     WaterMark w2 = 2;
813     EXPECT_EQ(meta.SaveLocalWaterMark("D1", w2), E_OK);
814     EXPECT_EQ(meta.SavePeerWaterMark("D1", w2, true), E_OK);
815 
816     /**
817      * @tc.steps: step5. get receive and send watermark
818      * @tc.expected: step5. E_OK and get the w1
819      */
820     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
821     EXPECT_EQ(w2, w);
822     EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
823     EXPECT_EQ(w2, w);
824 
825     /**
826      * @tc.steps: step6. set peer and local watermark
827      * @tc.expected: step6. E_OK
828      */
829     WaterMark w3 = 3;
830     EXPECT_EQ(meta.SaveLocalWaterMark("D2", w3), E_OK);
831     EXPECT_EQ(meta.SavePeerWaterMark("D2", w3, true), E_OK);
832 
833     /**
834      * @tc.steps: step7. get receive and send watermark
835      * @tc.expected: step7. E_OK and get the w3
836      */
837     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q2", "D2", w), E_OK);
838     EXPECT_EQ(w3, w);
839     EXPECT_EQ(meta.GetSendQueryWaterMark("Q2", "D2", w), E_OK);
840     EXPECT_EQ(w3, w);
841 
842     /**
843      * @tc.steps: step8. get not exit receive and send watermark
844      * @tc.expected: step8. E_OK and get the 0
845      */
846     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q3", "D3", w), E_OK);
847     EXPECT_EQ(w, 0u);
848     EXPECT_EQ(meta.GetSendQueryWaterMark("Q3", "D3", w), E_OK);
849     EXPECT_EQ(w, 0u);
850 }
851 
852 /**
853  * @tc.name: GetQueryWaterMark 002
854  * @tc.desc: Test metaData save and get queryWaterMark after push or pull mode.
855  * @tc.type: FUNC
856  * @tc.require: AR000FN6G9
857  * @tc.author: zhangqiquan
858  */
859 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark002, TestSize.Level1)
860 {
861     VirtualSingleVerSyncDBInterface storage;
862     Metadata meta;
863 
864     /**
865      * @tc.steps: step1. initialize meta with storage
866      * @tc.expected: step1. E_OK
867      */
868     int errCode = meta.Initialize(&storage);
869     ASSERT_EQ(errCode, E_OK);
870 
871     /**
872      * @tc.steps: step2. set peer and local watermark
873      * @tc.expected: step2. E_OK
874      */
875     WaterMark w1 = 2;
876     EXPECT_EQ(meta.SaveLocalWaterMark("D1", w1), E_OK);
877     EXPECT_EQ(meta.SavePeerWaterMark("D1", w1, true), E_OK);
878 
879     /**
880      * @tc.steps: step2. save receive and send watermark
881      * @tc.expected: step2. E_OK
882      */
883     WaterMark w2 = 1;
884     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
885     EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", w2), E_OK);
886 
887     /**
888      * @tc.steps: step3. get receive and send watermark
889      * @tc.expected: step3. E_OK and get the bigger value
890      */
891     WaterMark w = 0;
892     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
893     EXPECT_EQ(w1, w);
894     EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
895     EXPECT_EQ(w1, w);
896 }
897 
898 /**
899  * @tc.name: GetQueryWaterMark 003
900  * @tc.desc: check time offset after remove water mark
901  * @tc.type: FUNC
902  * @tc.require:
903  * @tc.author: lianhuix
904  */
905 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark003, TestSize.Level1)
906 {
907     VirtualSingleVerSyncDBInterface storage;
908     Metadata meta;
909 
910     int errCode = meta.Initialize(&storage);
911     ASSERT_EQ(errCode, E_OK);
912 
913     const std::string DEVICE_B = "DEVICE_B";
914     TimeOffset offset = 100; // 100: offset
915     meta.SaveTimeOffset(DEVICE_B, offset);
916 
917     WaterMark w1 = 2; // 2: watermark
918     meta.SavePeerWaterMark(DBCommon::TransferHashString(DEVICE_B), w1, false);
919 
920     TimeOffset offsetGot;
921     meta.GetTimeOffset(DEVICE_B, offsetGot);
922     EXPECT_EQ(offsetGot, offset);
923 }
924 
925 /**
926  * @tc.name: GetDeleteWaterMark001
927  * @tc.desc: Test metaData save and get deleteWaterMark.
928  * @tc.type: FUNC
929  * @tc.require: AR000FN6G9
930  * @tc.author: zhangqiquan
931  */
932 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteWaterMark001, TestSize.Level1)
933 {
934     VirtualSingleVerSyncDBInterface storage;
935     Metadata meta;
936 
937     /**
938      * @tc.steps: step1. initialize meta with storage
939      * @tc.expected: step1. E_OK
940      */
941     int errCode = meta.Initialize(&storage);
942     ASSERT_EQ(errCode, E_OK);
943 
944     /**
945      * @tc.steps: step2. set and get recv/send delete watermark
946      * @tc.expected: step2. set E_OK and get water mark is equal with last set
947      */
948     const std::string device = "DEVICE";
949     const WaterMark maxWaterMark = 1000u;
__anond092fe7f0202() 950     std::thread recvThread([&meta, &device, &maxWaterMark]() {
951         for (WaterMark expectRecv = 0u; expectRecv < maxWaterMark; ++expectRecv) {
952             EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark(device, expectRecv), E_OK);
953             WaterMark actualRecv = 0u;
954             EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark(device, actualRecv), E_OK);
955             EXPECT_EQ(actualRecv, expectRecv);
956         }
957     });
__anond092fe7f0302() 958     std::thread sendThread([&meta, &device, &maxWaterMark]() {
959         for (WaterMark expectSend = 0u; expectSend < maxWaterMark; ++expectSend) {
960             EXPECT_EQ(meta.SetSendDeleteSyncWaterMark(device, expectSend), E_OK);
961             WaterMark actualSend = 0u;
962             EXPECT_EQ(meta.GetSendDeleteSyncWaterMark(device, actualSend), E_OK);
963             EXPECT_EQ(actualSend, expectSend);
964         }
965     });
966     recvThread.join();
967     sendThread.join();
968 }
969 
970 /**
971  * @tc.name: ClearQueryWaterMark 001
972  * @tc.desc: Test metaData clear watermark function.
973  * @tc.type: FUNC
974  * @tc.require: AR000FN6G9
975  * @tc.author: zhangqiquan
976  */
977 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark001, TestSize.Level1)
978 {
979     VirtualSingleVerSyncDBInterface storage;
980     Metadata meta;
981 
982     /**
983      * @tc.steps: step1. initialize meta with storage
984      * @tc.expected: step1. E_OK
985      */
986     int errCode = meta.Initialize(&storage);
987     ASSERT_EQ(errCode, E_OK);
988 
989     /**
990      * @tc.steps: step2. save receive watermark
991      * @tc.expected: step2. E_OK
992      */
993     WaterMark w1 = 1;
994     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
995 
996     /**
997      * @tc.steps: step3. erase peer watermark
998      * @tc.expected: step3. E_OK
999      */
1000     EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
1001 
1002     /**
1003      * @tc.steps: step4. get receive watermark
1004      * @tc.expected: step4. E_OK receive watermark is zero
1005      */
1006     WaterMark w2 = -1;
1007     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
1008     EXPECT_EQ(w2, 0u);
1009 
1010     /**
1011      * @tc.steps: step5. set peer watermark
1012      * @tc.expected: step5. E_OK
1013      */
1014     WaterMark w3 = 2;
1015     EXPECT_EQ(meta.SavePeerWaterMark("D1", w3, true), E_OK);
1016 
1017     /**
1018      * @tc.steps: step6. get receive watermark
1019      * @tc.expected: step6. E_OK receive watermark is peer watermark
1020      */
1021     WaterMark w4 = -1;
1022     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w4), E_OK);
1023     EXPECT_EQ(w4, w3);
1024 }
1025 
1026 /**
1027  * @tc.name: ClearQueryWaterMark 002
1028  * @tc.desc: Test metaData clear watermark function.
1029  * @tc.type: FUNC
1030  * @tc.require: AR000FN6G9
1031  * @tc.author: zhangqiquan
1032  */
1033 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark002, TestSize.Level1)
1034 {
1035     VirtualSingleVerSyncDBInterface storage;
1036     Metadata meta;
1037 
1038     /**
1039      * @tc.steps: step1. initialize meta with storage
1040      * @tc.expected: step1. E_OK
1041      */
1042     int errCode = meta.Initialize(&storage);
1043     ASSERT_EQ(errCode, E_OK);
1044 
1045     /**
1046      * @tc.steps: step2. save receive watermark
1047      * @tc.expected: step2. E_OK
1048      */
1049     WaterMark w1 = 1;
1050     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
1051     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q2", "D1", w1), E_OK);
1052     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D2", w1), E_OK);
1053 
1054     /**
1055      * @tc.steps: step3. erase peer watermark, make sure data remove in db
1056      * @tc.expected: step3. E_OK
1057      */
1058     Metadata anotherMeta;
1059     ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1060     EXPECT_EQ(anotherMeta.EraseDeviceWaterMark("D1", true), E_OK);
1061 
1062     /**
1063      * @tc.steps: step4. get receive watermark
1064      * @tc.expected: step4. E_OK receive watermark is zero
1065      */
1066     WaterMark w2 = -1;
1067     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
1068     EXPECT_EQ(w2, 0u);
1069     w2 = -1;
1070     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q2", "D1", w2), E_OK);
1071     EXPECT_EQ(w2, 0u);
1072     w2 = -1;
1073     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D2", w2), E_OK);
1074     EXPECT_EQ(w2, w1);
1075 }
1076 
1077 /**
1078  * @tc.name: ClearQueryWaterMark 003
1079  * @tc.desc: Test metaData clear watermark busy.
1080  * @tc.type: FUNC
1081  * @tc.require: AR000FN6G9
1082  * @tc.author: zhangqiquan
1083  */
1084 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark003, TestSize.Level1)
1085 {
1086     VirtualSingleVerSyncDBInterface storage;
1087     Metadata meta;
1088     /**
1089      * @tc.steps: step1. initialize meta with storage
1090      * @tc.expected: step1. E_OK
1091      */
1092     int errCode = meta.Initialize(&storage);
1093     ASSERT_EQ(errCode, E_OK);
1094     /**
1095      * @tc.steps: step2. set busy and erase water mark
1096      * @tc.expected: step2. -E_BUSY
1097      */
1098     storage.SetBusy(false, true);
1099     EXPECT_EQ(meta.EraseDeviceWaterMark("DEVICE_ID", true), -E_BUSY);
1100 }
1101 
1102 /**
1103  * @tc.name: GetQueryLastTimestamp001
1104  * @tc.desc: Test function of GetQueryLastTimestamp.
1105  * @tc.type: FUNC
1106  * @tc.require: AR000FN6G9
1107  * @tc.author: zhangshijie
1108  */
1109 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryLastTimestamp001, TestSize.Level1)
1110 {
1111     /**
1112      * @tc.steps: step1. initialize meta with nullptr
1113      * @tc.expected: step1. return -E_INVALID_DB
1114      */
1115     Metadata meta;
1116     EXPECT_EQ(meta.Initialize(nullptr), -E_INVALID_DB);
1117 
1118     /**
1119      * @tc.steps: step2. initialize meta with storage
1120      * @tc.expected: step2. E_OK
1121      */
1122     VirtualSingleVerSyncDBInterface storage;
1123     int errCode = meta.Initialize(&storage);
1124     ASSERT_EQ(errCode, E_OK);
1125 
1126     /**
1127      * @tc.steps: step3. call GetQueryLastTimestamp with a non-exists device
1128      * @tc.expected: step3. return INT64_MAX
1129      */
1130     EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), static_cast<uint64_t>(INT64_MAX));
1131 
1132     /**
1133      * @tc.steps: step4. call GetQueryLastTimestamp with device D1 again
1134      * @tc.expected: step4. return 0
1135      */
1136     EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), 0u);
1137 
1138     /**
1139      * @tc.steps: step5. call GetQueryLastTimestamp with device D1 and Q2
1140      * @tc.expected: step5. return INT64_MAX
1141      */
1142     EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q2"), static_cast<uint64_t>(INT64_MAX));
1143 }
1144 
1145 /**
1146  * @tc.name: GetQueryLastTimestamp002
1147  * @tc.desc: Test Metadata::GetQueryLastTimestamp when timestamp out of INT64 range.
1148  * @tc.type: FUNC
1149  * @tc.author: liuhongyang
1150  */
1151 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryLastTimestamp002, TestSize.Level1)
1152 {
1153     /**
1154      * @tc.steps: step1. initialize storage with fake meta data
1155      * @tc.expected: step1. E_OK
1156      */
1157     const int64_t validInt64Num1 = 3000000000000000000; // a random valid int64 number
1158     const int64_t validInt64Num2 = 3000000000000000001; // a random valid int64 number
1159     // key is queryId, value is: [stored timestamp in db, expect return value of GetQueryLastTimestamp]
1160     std::map<std::string, std::pair<std::string, int64_t>> idValueMap = {
1161         {"regular1", {"3000000000000000000", validInt64Num1}},
1162         {"max", {"9223372036854775807", INT64_MAX}},
1163         {"min", {"-9223372036854775808", INT64_MIN}},
1164         {"overMax", {"9223372036854775808", INT64_MAX}},
1165         {"underMin", {"-9223372036854775809", INT64_MIN}},
1166         {"regular2", {"3000000000000000001", validInt64Num2}}};
1167     Key metaKey;
1168     Value value;
1169     VirtualSingleVerSyncDBInterface storage;
1170     for (auto &pair : idValueMap) {
1171         std::string keyStr = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(pair.first);
1172         DBCommon::StringToVector(keyStr, metaKey);
1173         DBCommon::StringToVector(pair.second.first, value);
1174         EXPECT_EQ(storage.PutMetaData(metaKey, value, false), E_OK);
1175     }
1176     /**
1177      * @tc.steps: step2. call GetQueryLastTimestamp with different query id
1178      * @tc.expected: step2. get the correct return value
1179      */
1180     Metadata meta;
1181     int errCode = meta.Initialize(&storage);
1182     ASSERT_EQ(errCode, E_OK);
1183     for (auto &pair : idValueMap) {
1184         auto &queryId = pair.first;
1185         auto &expectVal = pair.second.second;
1186         EXPECT_EQ(meta.GetQueryLastTimestamp("any", queryId), static_cast<uint64_t>(expectVal));
1187     }
1188 }
1189 
1190 /**
1191  * @tc.name: MetaDataExceptionBranch001
1192  * @tc.desc: Test execption branch of meata data.
1193  * @tc.type: FUNC
1194  * @tc.require: AR000FN6G9
1195  * @tc.author: zhangshijie
1196  */
1197 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, MetaDataExceptionBranch001, TestSize.Level1)
1198 {
1199     /**
1200      * @tc.steps: step1. call GetRemoveDataMark with a device not in map
1201      * @tc.expected: step1. out value = 0
1202      */
1203     Metadata meta;
1204     VirtualSingleVerSyncDBInterface storage;
1205     int errCode = meta.Initialize(&storage);
1206     ASSERT_EQ(errCode, E_OK);
1207 
1208     uint64_t val = 99; // 99 is the initial value of outValue
1209     uint64_t outValue = val;
1210     meta.GetRemoveDataMark("D1", outValue);
1211     EXPECT_EQ(outValue, 0u);
1212 
1213     /**
1214      * @tc.steps: step2. reset outValue, call GetDbCreateTime with a device not in map
1215      * @tc.expected: step2. out value = 0
1216      */
1217     outValue = val;
1218     meta.GetDbCreateTime("D1", outValue);
1219     EXPECT_EQ(outValue, 0u);
1220 
1221     /**
1222      * @tc.steps: step3. call ResetMetaDataAfterRemoveData with a device not in map
1223      * @tc.expected: step3. return -E_NOT_FOUND
1224      */
1225     EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1"), -E_NOT_FOUND);
1226 }
1227 
1228 /**
1229  * @tc.name: GetDeleteKeyWaterMark 001
1230  * @tc.desc: Test metaData save and get deleteWaterMark.
1231  * @tc.type: FUNC
1232  * @tc.require: AR000FN6G9
1233  * @tc.author: zhangqiquan
1234  */
1235 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark001, TestSize.Level1)
1236 {
1237     VirtualSingleVerSyncDBInterface storage;
1238     Metadata meta;
1239 
1240     /**
1241      * @tc.steps: step1. initialize meta with storage
1242      * @tc.expected: step1. E_OK
1243      */
1244     int errCode = meta.Initialize(&storage);
1245     ASSERT_EQ(errCode, E_OK);
1246 
1247     /**
1248      * @tc.steps: step2. save receive and send watermark
1249      * @tc.expected: step2. E_OK
1250      */
1251     WaterMark w1 = 1;
1252     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w1), E_OK);
1253     EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", w1), E_OK);
1254 
1255     /**
1256      * @tc.steps: step3. get receive and send watermark
1257      * @tc.expected: step3. E_OK and get the latest value
1258      */
1259     WaterMark w = 0;
1260     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1261     EXPECT_EQ(w1, w);
1262     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1263     EXPECT_EQ(w1, w);
1264 
1265     /**
1266      * @tc.steps: step4. set peer and local watermark
1267      * @tc.expected: step4. E_OK
1268      */
1269     WaterMark w2 = 2;
1270     EXPECT_EQ(meta.SaveLocalWaterMark("D1", w2), E_OK);
1271     EXPECT_EQ(meta.SavePeerWaterMark("D1", w2, true), E_OK);
1272 
1273     /**
1274      * @tc.steps: step5. get receive and send watermark
1275      * @tc.expected: step5. E_OK and get the w1
1276      */
1277     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1278     EXPECT_EQ(w2, w);
1279     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1280     EXPECT_EQ(w2, w);
1281 
1282     /**
1283      * @tc.steps: step6. set peer and local watermark
1284      * @tc.expected: step6. E_OK
1285      */
1286     WaterMark w3 = 3;
1287     EXPECT_EQ(meta.SaveLocalWaterMark("D2", w3), E_OK);
1288     EXPECT_EQ(meta.SavePeerWaterMark("D2", w3, true), E_OK);
1289 
1290     /**
1291      * @tc.steps: step7. get receive and send watermark
1292      * @tc.expected: step7. E_OK and get the w3
1293      */
1294     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D2", w), E_OK);
1295     EXPECT_EQ(w3, w);
1296     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D2", w), E_OK);
1297     EXPECT_EQ(w3, w);
1298 
1299     /**
1300      * @tc.steps: step8. get not exit receive and send watermark
1301      * @tc.expected: step8. E_OK and get the 0
1302      */
1303     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D3", w), E_OK);
1304     EXPECT_EQ(w, 0u);
1305     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D3", w), E_OK);
1306     EXPECT_EQ(w, 0u);
1307 }
1308 
1309 /**
1310  * @tc.name: GetDeleteKeyWaterMark 002
1311  * @tc.desc: Test metaData save and get deleteWaterMark after push or pull mode.
1312  * @tc.type: FUNC
1313  * @tc.require: AR000FN6G9
1314  * @tc.author: zhangqiquan
1315  */
1316 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark002, TestSize.Level1)
1317 {
1318     VirtualSingleVerSyncDBInterface storage;
1319     Metadata meta;
1320 
1321     /**
1322      * @tc.steps: step1. initialize meta with storage
1323      * @tc.expected: step1. E_OK
1324      */
1325     int errCode = meta.Initialize(&storage);
1326     ASSERT_EQ(errCode, E_OK);
1327 
1328     /**
1329      * @tc.steps: step2. set peer and local watermark
1330      * @tc.expected: step2. E_OK
1331      */
1332     WaterMark w1 = 3;
1333     EXPECT_EQ(meta.SaveLocalWaterMark("D1", w1), E_OK);
1334     EXPECT_EQ(meta.SavePeerWaterMark("D1", w1, true), E_OK);
1335 
1336     /**
1337      * @tc.steps: step2. save receive and send watermark
1338      * @tc.expected: step2. E_OK
1339      */
1340     WaterMark w2 = 1;
1341     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w2), E_OK);
1342     EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", w2), E_OK);
1343 
1344     /**
1345      * @tc.steps: step3. get receive and send watermark
1346      * @tc.expected: step3. E_OK and get the bigger value
1347      */
1348     WaterMark w = 0;
1349     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1350     EXPECT_EQ(w1, w);
1351     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1352     EXPECT_EQ(w1, w);
1353 }
1354 
1355 /**
1356  * @tc.name: ClearDeleteKeyWaterMark 001
1357  * @tc.desc: Test metaData clear watermark function.
1358  * @tc.type: FUNC
1359  * @tc.require: AR000FN6G9
1360  * @tc.author: zhangqiquan
1361  */
1362 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearDeleteKeyWaterMark001, TestSize.Level1)
1363 {
1364     VirtualSingleVerSyncDBInterface storage;
1365     Metadata meta;
1366 
1367     /**
1368      * @tc.steps: step1. initialize meta with storage
1369      * @tc.expected: step1. E_OK
1370      */
1371     int errCode = meta.Initialize(&storage);
1372     ASSERT_EQ(errCode, E_OK);
1373 
1374     /**
1375      * @tc.steps: step2. save receive watermark
1376      * @tc.expected: step2. E_OK
1377      */
1378     WaterMark w1 = 1;
1379     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w1), E_OK);
1380 
1381     /**
1382      * @tc.steps: step3. erase peer watermark
1383      * @tc.expected: step3. E_OK
1384      */
1385     EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
1386 
1387     /**
1388      * @tc.steps: step4. get receive watermark
1389      * @tc.expected: step4. E_OK receive watermark is zero
1390      */
1391     WaterMark w2 = -1;
1392     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w2), E_OK);
1393     EXPECT_EQ(w2, 0u);
1394 
1395     /**
1396      * @tc.steps: step5. set peer watermark
1397      * @tc.expected: step5. E_OK
1398      */
1399     WaterMark w3 = 2;
1400     EXPECT_EQ(meta.SavePeerWaterMark("D1", w3, true), E_OK);
1401 
1402     /**
1403      * @tc.steps: step6. get receive watermark
1404      * @tc.expected: step6. E_OK receive watermark is peer watermark
1405      */
1406     WaterMark w4 = -1;
1407     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w4), E_OK);
1408     EXPECT_EQ(w4, w3);
1409 }
1410 
1411 /**
1412  * @tc.name: VerifyCacheAndDb 001
1413  * @tc.desc: Test metaData watermark cache and db are consistent and correct.
1414  * @tc.type: FUNC
1415  * @tc.require: AR000FN6G9
1416  * @tc.author: zhangqiquan
1417  */
1418 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataQuerySync001, TestSize.Level1)
1419 {
1420     Metadata meta;
1421     VirtualSingleVerSyncDBInterface storage;
1422 
1423     /**
1424      * @tc.steps: step1. initialize meta with storage
1425      * @tc.expected: step1. E_OK
1426      */
1427     int errCode = meta.Initialize(&storage);
1428     ASSERT_EQ(errCode, E_OK);
1429 
1430     const std::string deviceId = "D1";
1431     const std::string queryId = "Q1";
1432 
1433     /**
1434     * @tc.steps: step2. save deleteSync watermark
1435     * @tc.expected: step2. E_OK
1436     */
1437     WaterMark deleteWaterMark = 1;
1438     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark(deviceId, deleteWaterMark), E_OK);
1439     EXPECT_EQ(meta.SetSendDeleteSyncWaterMark(deviceId, deleteWaterMark), E_OK);
1440 
1441     /**
1442     * @tc.steps: step3. save querySync watermark
1443     * @tc.expected: step2. E_OK
1444     */
1445     WaterMark queryWaterMark = 2;
1446     EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, queryWaterMark), E_OK);
1447     EXPECT_EQ(meta.SetSendQueryWaterMark(queryId, deviceId, queryWaterMark), E_OK);
1448 
1449     /**
1450     * @tc.steps: step4. initialize meta with storage
1451     * @tc.expected: step4. E_OK
1452     */
1453     Metadata anotherMeta;
1454     ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1455 
1456     /**
1457     * @tc.steps: step5. verify delete sync data
1458     * @tc.expected: step5. E_OK and waterMark equal to deleteWaterMark
1459     */
1460     WaterMark waterMark;
1461     EXPECT_EQ(anotherMeta.GetRecvDeleteSyncWaterMark(deviceId, waterMark), E_OK);
1462     EXPECT_EQ(waterMark, deleteWaterMark);
1463     EXPECT_EQ(anotherMeta.GetSendDeleteSyncWaterMark(deviceId, waterMark), E_OK);
1464     EXPECT_EQ(waterMark, deleteWaterMark);
1465 
1466     /**
1467     * @tc.steps: step6. verify query sync data
1468     * @tc.expected: step6. E_OK and waterMark equal to queryWaterMark
1469     */
1470     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark(queryId, deviceId, waterMark), E_OK);
1471     EXPECT_EQ(waterMark, queryWaterMark);
1472     EXPECT_EQ(anotherMeta.GetSendQueryWaterMark(queryId, deviceId, waterMark), E_OK);
1473     EXPECT_EQ(waterMark, queryWaterMark);
1474 }
1475 
1476 /**
1477  * @tc.name: VerifyLruMap 001
1478  * @tc.desc: Test metaData watermark cache lru ability.
1479  * @tc.type: FUNC
1480  * @tc.require: AR000FN6G9
1481  * @tc.author: zhangqiquan
1482  */
1483 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyLruMap001, TestSize.Level1)
1484 {
1485     LruMap<std::string, QueryWaterMark> lruMap;
1486     const int maxCacheItems = 200;
1487 
1488     /**
1489     * @tc.steps: step1. fill items to LruMap
1490     * @tc.expected: step1. E_OK
1491     */
1492     const int startCount = 0;
1493     for (int i = startCount; i < maxCacheItems; i++) {
1494         std::string key = std::to_string(i);
1495         QueryWaterMark value;
1496         value.recvWaterMark = static_cast<uint64_t>(i + 1);
1497         EXPECT_EQ(lruMap.Put(key, value), E_OK);
1498     }
1499 
1500     /**
1501     * @tc.steps: step2. get the first item
1502     * @tc.expected: step2. E_OK first item will move to last
1503     */
1504     std::string firstItemKey = std::to_string(startCount);
1505     QueryWaterMark firstItemValue;
1506     EXPECT_EQ(lruMap.Get(firstItemKey, firstItemValue), E_OK);
1507     EXPECT_EQ(firstItemValue.recvWaterMark, 1u);
1508 
1509     /**
1510     * @tc.steps: step3. insert new items to LruMap
1511     * @tc.expected: step3. the second items was removed
1512     */
1513     std::string key = std::to_string(maxCacheItems);
1514     QueryWaterMark value;
1515     value.recvWaterMark = maxCacheItems;
1516     EXPECT_EQ(lruMap.Put(key, value), E_OK);
1517 
1518     /**
1519     * @tc.steps: step4. get the second item
1520     * @tc.expected: step4. E_NOT_FOUND it was removed by algorithm
1521     */
1522     std::string secondItemKey = std::to_string(startCount + 1);
1523     QueryWaterMark secondItemValue;
1524     EXPECT_EQ(lruMap.Get(secondItemKey, secondItemValue), -E_NOT_FOUND);
1525     EXPECT_EQ(secondItemValue.recvWaterMark, 0u);
1526 }
1527 
1528 /**
1529  * @tc.name: VerifyMetaDataInit 001
1530  * @tc.desc: Test metaData init correctly
1531  * @tc.type: FUNC
1532  * @tc.require: AR000FN6G9
1533  * @tc.author: zhangqiquan
1534  */
1535 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataInit001, TestSize.Level1)
1536 {
1537     Metadata meta;
1538     VirtualSingleVerSyncDBInterface storage;
1539 
1540     /**
1541     * @tc.steps: step1. initialize meta with storage
1542     * @tc.expected: step1. E_OK
1543     */
1544     ASSERT_EQ(meta.Initialize(&storage), E_OK);
1545 
1546     DeviceID deviceA = "DeviceA";
1547     DeviceID deviceB = "DeviceA";
1548     WaterMark setWaterMark = 1;
1549 
1550     /**
1551     * @tc.steps: step2. meta save and get waterMark
1552     * @tc.expected: step2. expect get the same waterMark
1553     */
1554     EXPECT_EQ(meta.SaveLocalWaterMark(deviceA, setWaterMark), E_OK);
1555     EXPECT_EQ(meta.SaveLocalWaterMark(deviceB, setWaterMark), E_OK);
1556     WaterMark getWaterMark = 0;
1557     meta.GetLocalWaterMark(deviceA, getWaterMark);
1558     EXPECT_EQ(getWaterMark, setWaterMark);
1559     meta.GetLocalWaterMark(deviceB, getWaterMark);
1560     EXPECT_EQ(getWaterMark, setWaterMark);
1561 
1562 
1563     /**
1564     * @tc.steps: step3. init again
1565     * @tc.expected: step3. E_OK
1566     */
1567     Metadata anotherMeta;
1568     ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1569 
1570     /**
1571     * @tc.steps: step4. get waterMark again
1572     * @tc.expected: step4. expect get the same waterMark
1573     */
1574     anotherMeta.GetLocalWaterMark(deviceA, getWaterMark);
1575     EXPECT_EQ(getWaterMark, setWaterMark);
1576     anotherMeta.GetLocalWaterMark(deviceB, getWaterMark);
1577     EXPECT_EQ(getWaterMark, setWaterMark);
1578 }
1579 
1580 namespace {
InitVerifyStorageEnvironment(Metadata & meta,VirtualSingleVerSyncDBInterface & storage,const std::string & deviceId,const int & startCount,const uint32_t & maxStoreItems)1581 void InitVerifyStorageEnvironment(Metadata &meta, VirtualSingleVerSyncDBInterface &storage,
1582     const std::string &deviceId, const int &startCount, const uint32_t &maxStoreItems)
1583 {
1584     /**
1585     * @tc.steps: step1. initialize meta with storage
1586     * @tc.expected: step1. E_OK
1587     */
1588     ASSERT_EQ(meta.Initialize(&storage), E_OK);
1589 
1590     /**
1591     * @tc.steps: step2. fill items to metadata
1592     * @tc.expected: step2. E_OK
1593     */
1594     for (uint32_t i = startCount; i < maxStoreItems; i++) {
1595         std::string queryId = std::to_string(i);
1596         WaterMark recvWaterMark = i + 1;
1597         EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, recvWaterMark), E_OK);
1598     }
1599 }
1600 }
1601 
1602 /**
1603  * @tc.name: VerifyManagerQuerySyncStorage 001
1604  * @tc.desc: Test metaData remove least used querySync storage items.
1605  * @tc.type: FUNC
1606  * @tc.require: AR000FN6G9
1607  * @tc.author: zhangqiquan
1608  */
1609 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage001, TestSize.Level3)
1610 {
1611     Metadata meta;
1612     VirtualSingleVerSyncDBInterface storage;
1613     const uint32_t maxStoreItems = 100000;
1614     const int startCount = 0;
1615     const std::string deviceId = "Device";
1616 
1617     InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1618 
1619     /**
1620     * @tc.steps: step3. insert new items to metadata
1621     * @tc.expected: step3. E_OK
1622     */
1623     std::string newQueryId = std::to_string(maxStoreItems);
1624     WaterMark newWaterMark = maxStoreItems + 1;
1625     EXPECT_EQ(meta.SetRecvQueryWaterMark(newQueryId, deviceId, newWaterMark), E_OK);
1626 
1627     /**
1628     * @tc.steps: step4. touch the first item
1629     * @tc.expected: step4. E_OK update first item used time
1630     */
1631     std::string firstItemKey = std::to_string(startCount);
1632     WaterMark firstWaterMark = 11u;
1633     EXPECT_EQ(meta.SetRecvQueryWaterMark(firstItemKey, deviceId, firstWaterMark), E_OK);
1634 
1635     /**
1636     * @tc.steps: step5. initialize new meta with storage
1637     * @tc.expected: step5. the second item will be removed
1638     */
1639     Metadata newMeta;
1640     ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1641 
1642     /**
1643     * @tc.steps: step6. touch the first item
1644     * @tc.expected: step6. E_OK it still exist
1645     */
1646     WaterMark exceptWaterMark;
1647     EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, exceptWaterMark), E_OK);
1648     EXPECT_EQ(exceptWaterMark, firstWaterMark);
1649 
1650     /**
1651     * @tc.steps: step7. get the second item
1652     * @tc.expected: step7. NOT_FOUND secondWaterMark is zero
1653     */
1654     WaterMark secondWaterMark;
1655     std::string secondQueryId = std::to_string(startCount + 1);
1656     EXPECT_EQ(newMeta.GetRecvQueryWaterMark(secondQueryId, deviceId, secondWaterMark), E_OK);
1657     EXPECT_EQ(secondWaterMark, 0u);
1658 }
1659 
1660 /**
1661  * @tc.name: VerifyMetaDbCreateTime 001
1662  * @tc.desc: Test metaData get and set cbCreateTime.
1663  * @tc.type: FUNC
1664  * @tc.require: AR000FN6G9
1665  * @tc.author: zhuwentao
1666  */
1667 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDbCreateTime001, TestSize.Level1)
1668 {
1669     Metadata meta;
1670     VirtualSingleVerSyncDBInterface storage;
1671     /**
1672      * @tc.steps: step1. initialize meta with storage
1673      * @tc.expected: step1. E_OK
1674      */
1675     int errCode = meta.Initialize(&storage);
1676     ASSERT_EQ(errCode, E_OK);
1677     /**
1678      * @tc.steps: step2. set local and peer watermark and dbCreateTime
1679      * @tc.expected: step4. E_OK
1680      */
1681     WaterMark value = 2;
1682     EXPECT_EQ(meta.SaveLocalWaterMark("D1", value), E_OK);
1683     EXPECT_EQ(meta.SavePeerWaterMark("D1", value, true), E_OK);
1684     EXPECT_EQ(meta.SetDbCreateTime("D1", 10u, true), E_OK);
1685     /**
1686      * @tc.steps: step3. check peer and local watermark and dbCreateTime
1687      * @tc.expected: step4. E_OK
1688      */
1689     WaterMark curValue = 0;
1690     meta.GetLocalWaterMark("D1", curValue);
1691     EXPECT_EQ(value, curValue);
1692     meta.GetPeerWaterMark("D1", curValue);
1693     EXPECT_EQ(value, curValue);
1694     uint64_t curDbCreatTime = 0;
1695     meta.GetDbCreateTime("D1", curDbCreatTime);
1696     EXPECT_EQ(curDbCreatTime, 10u);
1697     /**
1698      * @tc.steps: step3. change dbCreateTime and check
1699      * @tc.expected: step4. E_OK
1700      */
1701     EXPECT_EQ(meta.SetDbCreateTime("D1", 20u, true), E_OK);
1702     uint64_t clearDeviceDataMark = INT_MAX;
1703     meta.GetRemoveDataMark("D1", clearDeviceDataMark);
1704     EXPECT_EQ(clearDeviceDataMark, 1u);
1705     EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1"), E_OK);
1706     meta.GetRemoveDataMark("D1", clearDeviceDataMark);
1707     EXPECT_EQ(clearDeviceDataMark, 0u);
1708     meta.GetDbCreateTime("D1", curDbCreatTime);
1709     EXPECT_EQ(curDbCreatTime, 20u);
1710 }
1711 
1712 /**
1713  * @tc.name: VerifyManagerQuerySyncStorage 002
1714  * @tc.desc: Test metaData remove least used querySync storage items when exit wrong data.
1715  * @tc.type: FUNC
1716  * @tc.require: AR000FN6G9
1717  * @tc.author: zhangqiquan
1718  */
1719 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage002, TestSize.Level3)
1720 {
1721     Metadata meta;
1722     VirtualSingleVerSyncDBInterface storage;
1723     const uint32_t maxStoreItems = 100000;
1724     const int startCount = 0;
1725     const std::string deviceId = "Device";
1726 
1727     InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1728 
1729     /**
1730     * @tc.steps: step3. insert a wrong Value
1731     * @tc.expected: step3. E_OK
1732     */
1733     std::string newQueryId = std::to_string(maxStoreItems);
1734     Key dbKey;
1735     DBCommon::StringToVector(QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
1736         + DBCommon::TransferHashString(deviceId) + newQueryId, dbKey);
1737     Value wrongValue;
1738     EXPECT_EQ(storage.PutMetaData(dbKey, wrongValue, false), E_OK);
1739 
1740     /**
1741     * @tc.steps: step4. initialize new meta with storage
1742     * @tc.expected: step4. E_OK
1743     */
1744     Metadata newMeta;
1745     ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1746 
1747     /**
1748     * @tc.steps: step5. touch the first item
1749     * @tc.expected: step5. E_OK still exit
1750     */
1751     std::string firstItemKey = std::to_string(startCount);
1752     WaterMark exceptWaterMark;
1753     EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, exceptWaterMark), E_OK);
1754     EXPECT_EQ(exceptWaterMark, 1u);
1755 }
1756 
1757 /**
1758  * @tc.name: AllPredicateQuerySync001
1759  * @tc.desc: Test normal push sync for AllPredicate data.
1760  * @tc.type: FUNC
1761  * @tc.require: AR000FN6G9
1762  * @tc.author: zhuwentao
1763  */
1764 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync001, TestSize.Level1)
1765 {
1766     /**
1767      * @tc.steps: step1. InitSchemaDb
1768      */
1769     InitSchemaDb();
1770     DBStatus status = OK;
1771     std::vector<std::string> devices;
1772     devices.push_back(g_deviceB->GetDeviceId());
1773 
1774     /**
1775      * @tc.steps: step2. deviceA put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1776                          {key21, SCHEMA_VALUE2} - {key29, SCHEMA_VALUE2}
1777      */
1778     Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1779     Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1780     Key key = {'1'};
1781     Key key2 = {'2'};
1782     const int dataSize = 4000;
1783     for (int i = 0; i < dataSize; i++) {
1784         key.push_back(i);
1785         key2.push_back(i);
1786         status = g_schemaKvDelegatePtr->Put(key, value);
1787         ASSERT_TRUE(status == OK);
1788         status = g_schemaKvDelegatePtr->Put(key2, value2);
1789         ASSERT_TRUE(status == OK);
1790         key.pop_back();
1791         key2.pop_back();
1792     }
1793     ASSERT_TRUE(status == OK);
1794 
1795     /**
1796      * @tc.steps: step3. deviceA call query sync and wait
1797      * @tc.expected: step3. sync should return OK.
1798      */
1799     Query query = Query::Select().EqualTo("$.field_name1", 1);
1800     std::map<std::string, DBStatus> result;
1801     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1802     ASSERT_TRUE(status == OK);
1803 
1804     /**
1805      * @tc.expected: step4. onComplete should be called, DeviceB have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1806      */
1807     ASSERT_TRUE(result.size() == devices.size());
1808     for (const auto &pair : result) {
1809         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1810         EXPECT_TRUE(pair.second == OK);
1811     }
1812     VirtualDataItem item;
1813     VirtualDataItem item2;
1814     for (int i = 0; i < dataSize; i++) {
1815         key.push_back(i);
1816         key2.push_back(i);
1817         g_deviceB->GetData(key, item);
1818         EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1819         EXPECT_TRUE(item.value == value);
1820         key.pop_back();
1821         key2.pop_back();
1822     }
1823 }
1824 
1825 /**
1826  * @tc.name: AllPredicateQuerySync002
1827  * @tc.desc: Test wrong query param push sync for AllPredicate data.
1828  * @tc.type: FUNC
1829  * @tc.require: AR000FN6G9
1830  * @tc.author: zhuwentao
1831  */
1832 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync002, TestSize.Level1)
1833 {
1834     /**
1835      * @tc.steps: step1. InitSchemaDb
1836      */
1837     InitSchemaDb();
1838     DBStatus status = OK;
1839     std::vector<std::string> devices;
1840     devices.push_back(g_deviceB->GetDeviceId());
1841 
1842     /**
1843      * @tc.steps: step2. deviceA call query sync and wait
1844      * @tc.expected: step2. sync should return INVALID_QUERY_FIELD
1845      */
1846     Query query = Query::Select().GreaterThan("field_name11", 10);
1847     std::map<std::string, DBStatus> result;
1848     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1849     ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1850     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
1851     ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1852     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1853     ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1854 }
1855 
1856 /**
1857  * @tc.name: AllPredicateQuerySync003
1858  * @tc.desc: Test normal push sync for AllPredicate data with limit
1859  * @tc.type: FUNC
1860  * @tc.require: AR000FN6G9
1861  * @tc.author: zhuwentao
1862  */
1863 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync003, TestSize.Level1)
1864 {
1865     /**
1866      * @tc.steps: step1. InitSchemaDb
1867      */
1868     InitSchemaDb();
1869     DBStatus status = OK;
1870     std::vector<std::string> devices;
1871     devices.push_back(g_deviceB->GetDeviceId());
1872 
1873     /**
1874      * @tc.steps: step2. deviceA put {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1875      */
1876     Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1877     Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1878     Key key = {'1'};
1879     Key key2 = {'2'};
1880     const int dataSize = 10;
1881     for (int i = 0; i < dataSize; i++) {
1882         key.push_back(i);
1883         key2.push_back(i);
1884         status = g_schemaKvDelegatePtr->Put(key, value);
1885         ASSERT_TRUE(status == OK);
1886         status = g_schemaKvDelegatePtr->Put(key2, value2);
1887         ASSERT_TRUE(status == OK);
1888         key.pop_back();
1889         key2.pop_back();
1890     }
1891     ASSERT_TRUE(status == OK);
1892 
1893     /**
1894      * @tc.steps: step3. deviceA call query sync with limit and wait
1895      * @tc.expected: step3. sync should return OK.
1896      */
1897     Query query = Query::Select().EqualTo("$.field_name1", 1).Limit(20, 0);
1898     std::map<std::string, DBStatus> result;
1899     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1900     ASSERT_TRUE(status == OK);
1901 
1902     /**
1903      * @tc.expected: step4. onComplete should be called, DeviceB have {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1904      */
1905     ASSERT_TRUE(result.size() == devices.size());
1906     for (const auto &pair : result) {
1907         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1908         EXPECT_TRUE(pair.second == OK);
1909     }
1910     VirtualDataItem item;
1911     VirtualDataItem item2;
1912     for (int i = 0; i < dataSize; i++) {
1913         key.push_back(i);
1914         key2.push_back(i);
1915         g_deviceB->GetData(key, item);
1916         EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1917         EXPECT_TRUE(item.value == value);
1918         key.pop_back();
1919         key2.pop_back();
1920     }
1921 }
1922 
1923 /**
1924  * @tc.name: AllPredicateQuerySync004
1925  * @tc.desc: Test normal pull sync for AllPredicate data.
1926  * @tc.type: FUNC
1927  * @tc.require: AR000FN6G9
1928  * @tc.author: zhuwentao
1929  */
1930 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync004, TestSize.Level1)
1931 {
1932     /**
1933      * @tc.steps: step1. InitSchemaDb
1934      */
1935     InitSchemaDb();
1936     DBStatus status = OK;
1937     std::vector<std::string> devices;
1938     devices.push_back(g_deviceB->GetDeviceId());
1939 
1940     /**
1941      * @tc.steps: step2. deviceB put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1942      */
1943     Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1944     Key key = {'1'};
1945     const int dataSize = 10;
1946     for (int i = 0; i < dataSize; i++) {
1947         key.push_back(i);
1948         g_deviceB->PutData(key, value, 10 + i, 0);
1949         ASSERT_TRUE(status == OK);
1950         key.pop_back();
1951     }
1952     ASSERT_TRUE(status == OK);
1953 
1954     /**
1955      * @tc.steps: step3. deviceA call query sync and wait
1956      * @tc.expected: step3. sync should return OK.
1957      */
1958     Query query = Query::Select().EqualTo("$.field_name1", 1);
1959     std::map<std::string, DBStatus> result;
1960     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1961     ASSERT_TRUE(status == OK);
1962 
1963     /**
1964      * @tc.expected: step4. onComplete should be called, DeviceA have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1965      */
1966     ASSERT_TRUE(result.size() == devices.size());
1967     for (const auto &pair : result) {
1968         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1969         EXPECT_TRUE(pair.second == OK);
1970     }
1971     Value item;
1972     Value item2;
1973     for (int i = 0; i < dataSize; i++) {
1974         key.push_back(i);
1975         g_schemaKvDelegatePtr->Get(key, item);
1976         EXPECT_TRUE(item == value);
1977         key.pop_back();
1978     }
1979 }