• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 #include <thread>
18 
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "generic_single_ver_kv_entry.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "platform_specific.h"
27 #include "query.h"
28 #include "query_sync_object.h"
29 #include "single_ver_data_sync.h"
30 #include "single_ver_serialize_manager.h"
31 #include "sync_types.h"
32 #include "virtual_communicator.h"
33 #include "virtual_communicator_aggregator.h"
34 #include "virtual_single_ver_sync_db_Interface.h"
35 
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40 
41 namespace {
42     string g_testDir;
43     const string STORE_ID = "kv_store_sync_test";
44     const string SCHEMA_STORE_ID = "kv_store_sync_schema_test";
45     const std::string DEVICE_B = "deviceB";
46 
47     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
48     KvStoreDelegateManager g_schemaMgr(SCHEMA_APP_ID, USER_ID);
49     KvStoreConfig g_config;
50     DistributedDBToolsUnitTest g_tool;
51     DBStatus g_kvDelegateStatus = INVALID_ARGS;
52     DBStatus g_schemaKvDelegateStatus = INVALID_ARGS;
53     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
54     KvStoreNbDelegate* g_schemaKvDelegatePtr = nullptr;
55     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
56     KvVirtualDevice *g_deviceB = nullptr;
57 
58     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
59     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
60         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
61     auto g_schemaKvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
62         placeholders::_1, placeholders::_2, std::ref(g_schemaKvDelegateStatus), std::ref(g_schemaKvDelegatePtr));
63     const string SCHEMA_STRING =
64     "{\"SCHEMA_VERSION\":\"1.0\","
65     "\"SCHEMA_MODE\":\"STRICT\","
66     "\"SCHEMA_DEFINE\":{"
67     "\"field_name1\":\"BOOL\","
68     "\"field_name2\":\"BOOL\","
69     "\"field_name3\":\"INTEGER, NOT NULL\","
70     "\"field_name4\":\"LONG, DEFAULT 100\","
71     "\"field_name5\":\"DOUBLE, NOT NULL, DEFAULT 3.14\","
72     "\"field_name6\":\"STRING, NOT NULL, DEFAULT '3.1415'\","
73     "\"field_name7\":\"LONG, DEFAULT 100\","
74     "\"field_name8\":\"LONG, DEFAULT 100\","
75     "\"field_name9\":\"LONG, DEFAULT 100\","
76     "\"field_name10\":\"LONG, DEFAULT 100\""
77     "},"
78     "\"SCHEMA_INDEXES\":[\"$.field_name1\", \"$.field_name2\"]}";
79 
80     const std::string SCHEMA_VALUE1 =
81     "{\"field_name1\":true,"
82     "\"field_name2\":false,"
83     "\"field_name3\":10,"
84     "\"field_name4\":20,"
85     "\"field_name5\":3.14,"
86     "\"field_name6\":\"3.1415\","
87     "\"field_name7\":100,"
88     "\"field_name8\":100,"
89     "\"field_name9\":100,"
90     "\"field_name10\":100}";
91 
92     const std::string SCHEMA_VALUE2 =
93     "{\"field_name1\":false,"
94     "\"field_name2\":true,"
95     "\"field_name3\":100,"
96     "\"field_name4\":200,"
97     "\"field_name5\":3.14,"
98     "\"field_name6\":\"3.1415\","
99     "\"field_name7\":100,"
100     "\"field_name8\":100,"
101     "\"field_name9\":100,"
102     "\"field_name10\":100}";
103 }
104 
105 class DistributedDBSingleVerP2PQuerySyncTest : public testing::Test {
106 public:
107     static void SetUpTestCase(void);
108     static void TearDownTestCase(void);
109     void SetUp();
110     void TearDown();
111 };
112 
SetUpTestCase(void)113 void DistributedDBSingleVerP2PQuerySyncTest::SetUpTestCase(void)
114 {
115     /**
116      * @tc.setup: Init datadir and Virtual Communicator.
117      */
118     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
119     string dir = g_testDir + "/single_ver";
120     DIR* dirTmp = opendir(dir.c_str());
121     if (dirTmp == nullptr) {
122         OS::MakeDBDirectory(dir);
123     } else {
124         closedir(dirTmp);
125     }
126 
127     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
128     ASSERT_TRUE(g_communicatorAggregator != nullptr);
129     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
130 }
131 
TearDownTestCase(void)132 void DistributedDBSingleVerP2PQuerySyncTest::TearDownTestCase(void)
133 {
134     /**
135      * @tc.teardown: Release virtual Communicator and clear data dir.
136      */
137     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
138         LOGE("rm test db files error!");
139     }
140     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
141 }
142 
SetUp(void)143 void DistributedDBSingleVerP2PQuerySyncTest::SetUp(void)
144 {
145     DistributedDBToolsUnitTest::PrintTestCaseInfo();
146     /**
147      * @tc.setup: create virtual device B and get a KvStoreNbDelegate as deviceA
148      */
149     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
150     ASSERT_TRUE(g_deviceB != nullptr);
151     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
152     ASSERT_TRUE(syncInterfaceB != nullptr);
153     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
154 }
155 
TearDown(void)156 void DistributedDBSingleVerP2PQuerySyncTest::TearDown(void)
157 {
158     /**
159      * @tc.teardown: Release device A, B
160      */
161     if (g_kvDelegatePtr != nullptr) {
162         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
163         g_kvDelegatePtr = nullptr;
164         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
165         LOGD("delete kv store status %d", status);
166         ASSERT_TRUE(status == OK);
167     }
168     if (g_schemaKvDelegatePtr != nullptr) {
169         ASSERT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
170         g_schemaKvDelegatePtr = nullptr;
171         DBStatus status = g_schemaMgr.DeleteKvStore(SCHEMA_STORE_ID);
172         LOGD("delete kv store status %d", status);
173         ASSERT_TRUE(status == OK);
174     }
175     if (g_deviceB != nullptr) {
176         delete g_deviceB;
177         g_deviceB = nullptr;
178     }
179     PermissionCheckCallbackV2 nullCallback;
180     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
181 }
182 
InitNormalDb()183 void InitNormalDb()
184 {
185     g_config.dataDir = g_testDir;
186     g_mgr.SetKvStoreConfig(g_config);
187     KvStoreNbDelegate::Option option;
188     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
189     ASSERT_TRUE(g_kvDelegateStatus == OK);
190     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
191 }
192 
InitSchemaDb()193 void InitSchemaDb()
194 {
195     g_config.dataDir = g_testDir;
196     g_schemaMgr.SetKvStoreConfig(g_config);
197     KvStoreNbDelegate::Option option;
198     option.schema = SCHEMA_STRING;
199     g_schemaMgr.GetKvStore(SCHEMA_STORE_ID, option, g_schemaKvDelegateCallback);
200     ASSERT_TRUE(g_schemaKvDelegateStatus == OK);
201     ASSERT_TRUE(g_schemaKvDelegatePtr != nullptr);
202     g_deviceB->SetSchema(option.schema);
203 }
204 
205 /**
206  * @tc.name: Normal Sync 001
207  * @tc.desc: Test normal push sync for keyprefix data.
208  * @tc.type: FUNC
209  * @tc.require:
210  * @tc.author: xushaohua
211  */
212 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync001, TestSize.Level1)
213 {
214     InitNormalDb();
215     DBStatus status = OK;
216     std::vector<std::string> devices;
217     devices.push_back(g_deviceB->GetDeviceId());
218 
219     /**
220      * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
221      */
222     Key key = {'1'};
223     Value value = {'1'};
224     const int dataSize = 10;
225     for (int i = 0; i < dataSize; i++) {
226         key.push_back(i);
227         value.push_back(i);
228         status = g_kvDelegatePtr->Put(key, value);
229         ASSERT_TRUE(status == OK);
230         key.pop_back();
231         value.pop_back();
232     }
233     Key key2 = {'2'};
234     Value value2 = {'2'};
235     status = g_kvDelegatePtr->Put(key2, value2);
236     ASSERT_TRUE(status == OK);
237 
238     /**
239      * @tc.steps: step2. deviceA call query sync and wait
240      * @tc.expected: step2. sync should return OK.
241      */
242     Query query = Query::Select().PrefixKey(key);
243     std::map<std::string, DBStatus> result;
244     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
245     ASSERT_TRUE(status == OK);
246 
247     /**
248      * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
249      */
250     ASSERT_TRUE(result.size() == devices.size());
251     for (const auto &pair : result) {
252         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
253         EXPECT_TRUE(pair.second == OK);
254     }
255     VirtualDataItem item;
256     for (int i = 0; i < dataSize; i++) {
257         key.push_back(i);
258         value.push_back(i);
259         g_deviceB->GetData(key, item);
260         EXPECT_TRUE(item.value == value);
261         key.pop_back();
262         value.pop_back();
263     }
264     EXPECT_TRUE(g_deviceB->GetData(key2, item) != E_OK);
265 }
266 
267 /**
268  * @tc.name: Normal Sync 002
269  * @tc.desc: Test normal push sync for limit and offset.
270  * @tc.type: FUNC
271  * @tc.require:
272  * @tc.author: xushaohua
273  */
274 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync002, TestSize.Level1)
275 {
276     InitNormalDb();
277     DBStatus status = OK;
278     std::vector<std::string> devices;
279     devices.push_back(g_deviceB->GetDeviceId());
280 
281     /**
282      * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
283      */
284     Key key = {'1'};
285     Value value = {'1'};
286     const int dataSize = 10;
287     for (int i = 0; i < dataSize; i++) {
288         key.push_back(i);
289         value.push_back(i);
290         status = g_kvDelegatePtr->Put(key, value);
291         ASSERT_TRUE(status == OK);
292         key.pop_back();
293         value.pop_back();
294     }
295 
296     /**
297      * @tc.steps: step2. deviceA call sync and wait
298      * @tc.expected: step2. sync should return OK.
299      */
300     const int limit = 5;
301     const int offset = 4;
302     Query query = Query::Select().PrefixKey(key).Limit(limit, offset);
303     std::map<std::string, DBStatus> result;
304     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
305     ASSERT_TRUE(status == OK);
306 
307     /**
308      * @tc.expected: step3. onComplete should be called, DeviceB have {k4,v4} {k8, v8}
309      */
310     ASSERT_TRUE(result.size() == devices.size());
311     for (const auto &pair : result) {
312         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
313         EXPECT_TRUE(pair.second == OK);
314     }
315 
316     VirtualDataItem item;
317     for (int i = limit - 1; i < limit + offset; i++) {
318         key.push_back(i);
319         value.push_back(i);
320         g_deviceB->GetData(key, item);
321         EXPECT_TRUE(item.value == value);
322         key.pop_back();
323         value.pop_back();
324     }
325 }
326 
327 /**
328  * @tc.name: Normal Sync 001
329  * @tc.desc: Test normal push_and_pull sync for keyprefix data.
330  * @tc.type: FUNC
331  * @tc.require:
332  * @tc.author: zhuwentao
333  */
334 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync003, TestSize.Level1)
335 {
336     InitNormalDb();
337     DBStatus status = OK;
338     std::vector<std::string> devices;
339     devices.push_back(g_deviceB->GetDeviceId());
340 
341     /**
342      * @tc.steps: step1. deviceA put {k, v}, {b, v}
343      */
344     Key key = {'1'};
345     Value value = {'1'};
346     const int dataSize = 10;
347     status = g_kvDelegatePtr->Put(key, value);
348     ASSERT_TRUE(status == OK);
349     Key key2 = {'2'};
350     Value value2 = {'2'};
351     status = g_kvDelegatePtr->Put(key2, value2);
352     ASSERT_TRUE(status == OK);
353 
354     /**
355      * @tc.steps: step2. deviceB put {b0, v0} - {b9, v9}, {c, v}
356      */
357     for (int i = 0; i < dataSize; i++) {
358         key2.push_back(i);
359         value2.push_back(i);
360         g_deviceB->PutData(key2, value2, 10 + i, 0);
361         key2.pop_back();
362         value2.pop_back();
363     }
364     Key key3 = {'3'};
365     Value value3 = {'3'};
366     g_deviceB->PutData(key3, value3, 20, 0);
367 
368     /**
369      * @tc.steps: step2. deviceA call query sync and wait
370      * @tc.expected: step2. sync should return OK.
371      */
372     Query query = Query::Select().PrefixKey(key2);
373     std::map<std::string, DBStatus> result;
374     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
375     ASSERT_TRUE(status == OK);
376 
377     /**
378      * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}, DeviceB have {b, v}
379      */
380     ASSERT_TRUE(result.size() == devices.size());
381     for (const auto &pair : result) {
382         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
383         EXPECT_TRUE(pair.second == OK);
384     }
385     VirtualDataItem item;
386     Value tmpValue;
387     for (int i = 0; i < dataSize; i++) {
388         key2.push_back(i);
389         value2.push_back(i);
390         g_kvDelegatePtr->Get(key2, tmpValue);
391         EXPECT_TRUE(tmpValue == value2);
392         key2.pop_back();
393         value2.pop_back();
394     }
395     EXPECT_TRUE(g_deviceB->GetData(key, item) != E_OK);
396     EXPECT_TRUE(g_deviceB->GetData(key2, item) == E_OK);
397     g_kvDelegatePtr->Get(key3, tmpValue);
398     EXPECT_TRUE(tmpValue != value3);
399 }
400 
401 /**
402  * @tc.name: Normal Sync 001
403  * @tc.desc: Test normal pull sync for keyprefix data.
404  * @tc.type: FUNC
405  * @tc.require:
406  * @tc.author: zhuwentao
407  */
408 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync004, TestSize.Level1)
409 {
410     InitNormalDb();
411     DBStatus status = OK;
412     std::vector<std::string> devices;
413     devices.push_back(g_deviceB->GetDeviceId());
414     /**
415      * @tc.steps: step1. deviceB put {k1, v1} - {k9, k9}, {b0, v0} - {b9, v9}
416      */
417     Key key = {'1'};
418     Value value = {'1'};
419     const int dataSize = 10;
420     Key key2 = {'2'};
421     Value value2 = {'2'};
422     vector<std::pair<Key, Value>> key1Vec;
423     vector<std::pair<Key, Value>> key2Vec;
424     for (int i = 0; i < dataSize; i++) {
425         Key tmpKey(key);
426         Value tmpValue(value);
427         tmpKey.push_back(i);
428         tmpValue.push_back(i);
429         key1Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
430     }
431     for (int i = 0; i < dataSize; i++) {
432         Key tmpKey(key2);
433         Value tmpValue(value2);
434         tmpKey.push_back(i);
435         tmpValue.push_back(i);
436         key2Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
437     }
438     for (int i = 0; i < dataSize; i++) {
439         g_deviceB->PutData(key2Vec[i].first, key2Vec[i].second, 20 + i, 0);
440         g_deviceB->PutData(key1Vec[i].first, key1Vec[i].second, 10 + i, 0);
441     }
442 
443     /**
444      * @tc.steps: step2. deviceA call query sync and wait
445      * @tc.expected: step2. sync should return OK.
446      */
447     Query query = Query::Select().PrefixKey(key2);
448     std::map<std::string, DBStatus> result;
449     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
450     ASSERT_TRUE(status == OK);
451 
452     /**
453      * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}
454      */
455     ASSERT_TRUE(result.size() == devices.size());
456     for (const auto &pair : result) {
457         EXPECT_TRUE(pair.second == OK);
458     }
459     VirtualDataItem item;
460     Value tmpValue;
461     for (int i = 0; i < dataSize; i++) {
462         g_kvDelegatePtr->Get(key2Vec[i].first, tmpValue);
463         EXPECT_TRUE(tmpValue == key2Vec[i].second);
464         g_kvDelegatePtr->Get(key1Vec[i].first, tmpValue);
465         EXPECT_TRUE(tmpValue != key1Vec[i].second);
466     }
467 }
468 
469 /**
470  * @tc.name: NormalSync005
471  * @tc.desc: Test normal push sync for inkeys query.
472  * @tc.type: FUNC
473  * @tc.require:
474  * @tc.author: lidongwei
475  */
476 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync005, TestSize.Level1)
477 {
478     InitNormalDb();
479     std::vector<std::string> devices;
480     devices.push_back(g_deviceB->GetDeviceId());
481 
482     /**
483      * @tc.steps: step1. deviceA put K1-K5
484      */
485     ASSERT_EQ(g_kvDelegatePtr->PutBatch(
486         {{KEY_1, VALUE_1}, {KEY_2, VALUE_2}, {KEY_3, VALUE_3}, {KEY_4, VALUE_4}, {KEY_5, VALUE_5}}), OK);
487 
488     /**
489      * @tc.steps: step2. deviceA sync K2,K4 and wait
490      * @tc.expected: step2. sync should return OK.
491      */
492     Query query = Query::Select().InKeys({KEY_2, KEY_4});
493     std::map<std::string, DBStatus> result;
494     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OK);
495 
496     /**
497      * @tc.expected: step3. onComplete should be called.
498      */
499     ASSERT_EQ(result.size(), devices.size());
500     for (const auto &pair : result) {
501         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
502         EXPECT_EQ(pair.second, OK);
503     }
504 
505     /**
506      * @tc.steps: step4. deviceB have K2K4 and have no K1K3K5.
507      * @tc.expected: step4. sync should return OK.
508      */
509     VirtualDataItem item;
510     EXPECT_EQ(g_deviceB->GetData(KEY_2, item), E_OK);
511     EXPECT_EQ(item.value, VALUE_2);
512     EXPECT_EQ(g_deviceB->GetData(KEY_4, item), E_OK);
513     EXPECT_EQ(item.value, VALUE_4);
514     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
515     EXPECT_EQ(g_deviceB->GetData(KEY_3, item), -E_NOT_FOUND);
516     EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
517 
518     /**
519      * @tc.steps: step5. deviceA sync with invalid inkeys query
520      * @tc.expected: step5. sync failed and the rc is right.
521      */
522     query = Query::Select().InKeys({});
523     result.clear();
524     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
525 
526     std::set<Key> keys;
527     for (uint8_t i = 0; i < DBConstant::MAX_BATCH_SIZE + 1; i++) {
528         Key key = { i };
529         keys.emplace(key);
530     }
531     query = Query::Select().InKeys(keys);
532     result.clear();
533     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OVER_MAX_LIMITS);
534 
535     query = Query::Select().InKeys({{}});
536     result.clear();
537     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
538 }
539 
540 /**
541  * @tc.name: NormalSync006
542  * @tc.desc: Test normal push sync with query by 32 devices;
543  * @tc.type: FUNC
544  * @tc.require:
545  * @tc.author: zhuwentao
546  */
547 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync006, TestSize.Level1)
548 {
549     /**
550      * @tc.steps: step1. init db and 32 devices
551      */
552     InitNormalDb();
553     uint32_t syncDevCount = 32u;
554     std::vector<KvVirtualDevice *> virtualDeviceVec(syncDevCount, nullptr);
555     const std::string device = "deviceTmp_";
556     std::vector<std::string> devices;
557     bool isError = false;
558     for (uint32_t i = 0; i < syncDevCount; i++) {
559         std::string tmpDev = device + std::to_string(i);
560         virtualDeviceVec[i] = new (std::nothrow) KvVirtualDevice(tmpDev);
561         if (virtualDeviceVec[i] == nullptr) {
562             isError = true;
563             break;
564         }
565         VirtualSingleVerSyncDBInterface *tmpSyncInterface = new (std::nothrow) VirtualSingleVerSyncDBInterface();
566         if (tmpSyncInterface == nullptr) {
567             isError = true;
568             break;
569         }
570         ASSERT_EQ(virtualDeviceVec[i]->Initialize(g_communicatorAggregator, tmpSyncInterface), E_OK);
571         devices.push_back(virtualDeviceVec[i]->GetDeviceId());
572     }
573     if (isError) {
574         for (uint32_t i = 0; i < syncDevCount; i++) {
575             if (virtualDeviceVec[i] != nullptr) {
576                 delete virtualDeviceVec[i];
577                 virtualDeviceVec[i] = nullptr;
578             }
579         }
580         ASSERT_TRUE(false);
581     }
582     /**
583      * @tc.steps: step2. deviceA put {k0, v0}
584      */
585     Key key = {'1'};
586     Value value = {'1'};
587     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
588     /**
589      * @tc.steps: step3. deviceA call query sync and wait
590      * @tc.expected: step3. sync should return OK.
591      */
592     Query query = Query::Select().PrefixKey(key);
593     std::map<std::string, DBStatus> result;
594     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
595 
596     /**
597      * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
598      */
599     ASSERT_TRUE(result.size() == devices.size());
600     for (const auto &pair : result) {
601         EXPECT_TRUE(pair.second == OK);
602     }
603     VirtualDataItem item;
604     for (uint32_t i = 0; i < syncDevCount; i++) {
605         EXPECT_TRUE(virtualDeviceVec[i]->GetData(key, item) == E_OK);
606         EXPECT_EQ(item.value, value);
607         delete virtualDeviceVec[i];
608         virtualDeviceVec[i] = nullptr;
609     }
610 }
611 
612 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryRequestPacketTest001, TestSize.Level1)
613 {
614     /**
615      * @tc.steps: step1. prepare a QuerySyncRequestPacket.
616      */
617     auto packet = new (std::nothrow) DataRequestPacket;
618     ASSERT_TRUE(packet != nullptr);
619     auto kvEntry = new (std::nothrow) GenericSingleVerKvEntry;
620     ASSERT_TRUE(kvEntry != nullptr);
621     kvEntry->SetTimestamp(1);
622     SyncEntry syncData {.entries = {kvEntry}};
623 #ifndef OMIT_ZLIB
624     ASSERT_TRUE(GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
625         {CompressAlgorithm::ZLIB, SOFTWARE_VERSION_CURRENT}) == E_OK);
626     packet->SetCompressAlgo(CompressAlgorithm::ZLIB);
627     packet->SetFlag(4); // set IS_COMPRESS_DATA flag true
628 #endif
629     packet->SetBasicInfo(-E_NOT_SUPPORT, SOFTWARE_VERSION_CURRENT, SyncModeType::QUERY_PUSH_PULL);
630     packet->SetData(syncData.entries);
631     packet->SetCompressData(syncData.compressedEntries);
632     packet->SetEndWaterMark(INT8_MAX);
633     packet->SetWaterMark(INT16_MAX, INT32_MAX, INT64_MAX);
634     QuerySyncObject syncQuery(Query::Select().PrefixKey({'2'}));
635     packet->SetQuery(syncQuery);
636     packet->SetQueryId(syncQuery.GetIdentify());
637     packet->SetReserved(std::vector<uint64_t> {INT8_MAX});
638 
639     /**
640      * @tc.steps: step2. put the QuerySyncRequestPacket into a message.
641      */
642     Message msg;
643     msg.SetExternalObject(packet);
644     msg.SetMessageId(QUERY_SYNC_MESSAGE);
645     msg.SetMessageType(TYPE_REQUEST);
646 
647     /**
648      * @tc.steps: step3. Serialization the message to a buffer.
649      */
650     int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
651     vector<uint8_t> buffer(len);
652     ASSERT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), E_OK);
653 
654     /**
655      * @tc.steps: step4. DeSerialization the buffer to a message.
656      */
657     Message outMsg(QUERY_SYNC_MESSAGE);
658     outMsg.SetMessageType(TYPE_REQUEST);
659     ASSERT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &outMsg), E_OK);
660 
661     /**
662      * @tc.steps: step5. checkout the outMsg.
663      * @tc.expected: step5. outMsg equal the the in msg
664      */
665     auto outPacket = outMsg.GetObject<DataRequestPacket>();
666     EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
667     EXPECT_EQ(outPacket->GetMode(), SyncModeType::QUERY_PUSH_PULL);
668     EXPECT_EQ(outPacket->GetEndWaterMark(), static_cast<uint64_t>(INT8_MAX));
669     EXPECT_EQ(outPacket->GetLocalWaterMark(), static_cast<uint64_t>(INT16_MAX));
670     EXPECT_EQ(outPacket->GetPeerWaterMark(), static_cast<uint64_t>(INT32_MAX));
671     EXPECT_EQ(outPacket->GetDeletedWaterMark(), static_cast<uint64_t>(INT64_MAX));
672 #ifndef OMIT_ZLIB
673     EXPECT_EQ(outPacket->GetFlag(), static_cast<uint32_t>(4)); // check IS_COMPRESS_DATA flag true
674 #endif
675     EXPECT_EQ(outPacket->GetQueryId(), syncQuery.GetIdentify());
676     EXPECT_EQ(outPacket->GetReserved(), std::vector<uint64_t> {INT8_MAX});
677     EXPECT_EQ(outPacket->GetSendCode(), -E_NOT_SUPPORT);
678     EXPECT_EQ(outPacket->GetData()[0]->GetTimestamp(), 1u);
679 }
680 
681 /**
682  * @tc.name: QueryRequestPacketTest002
683  * @tc.desc: Test exception branch of serialization.
684  * @tc.type: FUNC
685  * @tc.require:
686  * @tc.author: zhangshijie
687  */
688 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, SerializationManager001, TestSize.Level1)
689 {
690     /**
691      * @tc.steps: step1. call SingleVerSerializeManager::Serialization with buffer = nullptr or msg = nullptr
692      * @tc.expected:step1 return -E_MESSAGE_ID_ERROR
693      */
694     Message msg;
695     msg.SetMessageType(TYPE_INVALID);
696     vector<uint8_t> buffer(10); // 10 is test buffer len
697     EXPECT_EQ(SingleVerSerializeManager::Serialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
698     EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
699 
700     /**
701      * @tc.steps: step2. call SingleVerSerializeManager::Serialization with invalid type message
702      * @tc.expected:step2 return -E_MESSAGE_ID_ERROR
703      */
704     EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
705 
706     /**
707      * @tc.steps: step3. call SingleVerSerializeManager::DeSerialization with buffer = nullptr or msg = nullptr
708      * @tc.expected:step3 return -E_MESSAGE_ID_ERROR
709      */
710     EXPECT_EQ(SingleVerSerializeManager::DeSerialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
711     EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
712 
713     /**
714      * @tc.steps: step4. call SingleVerSerializeManager::DeSerialization with invalid type message
715      * @tc.expected:step4 return -E_MESSAGE_ID_ERROR
716      */
717     EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
718 }
719 
720 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryAckPacketTest001, TestSize.Level1)
721 {
722     /**
723      * @tc.steps: step1. prepare a QuerySyncAckPacket.
724      */
725     DataAckPacket packet;
726     packet.SetVersion(SOFTWARE_VERSION_CURRENT);
727     packet.SetData(INT64_MAX);
728     packet.SetRecvCode(-E_NOT_SUPPORT);
729     std::vector<uint64_t> reserved = {INT8_MAX};
730     packet.SetReserved(reserved);
731 
732     /**
733      * @tc.steps: step2. put the QuerySyncAckPacket into a message.
734      */
735     Message msg;
736     msg.SetCopiedObject(packet);
737     msg.SetMessageId(QUERY_SYNC_MESSAGE);
738     msg.SetMessageType(TYPE_RESPONSE);
739 
740     /**
741      * @tc.steps: step3. Serialization the message to a buffer.
742      */
743     int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
744     LOGE("test leng = %d", len);
745     uint8_t *buffer = new (nothrow) uint8_t[len];
746     ASSERT_TRUE(buffer != nullptr);
747     int errCode = SingleVerSerializeManager::Serialization(buffer, len, &msg);
748     ASSERT_EQ(errCode, E_OK);
749 
750     /**
751      * @tc.steps: step4. DeSerialization the buffer to a message.
752      */
753     Message outMsg;
754     outMsg.SetMessageId(QUERY_SYNC_MESSAGE);
755     outMsg.SetMessageType(TYPE_RESPONSE);
756     errCode = SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg);
757     ASSERT_EQ(errCode, E_OK);
758 
759     /**
760      * @tc.steps: step5. checkout the outMsg.
761      * @tc.expected: step5. outMsg equal the the in msg
762      */
763     auto outPacket = outMsg.GetObject<DataAckPacket>();
764     EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
765     EXPECT_EQ(outPacket->GetData(), static_cast<uint64_t>(INT64_MAX));
766     std::vector<uint64_t> reserved2 = {INT8_MAX};
767     EXPECT_EQ(outPacket->GetReserved(), reserved2);
768     EXPECT_EQ(outPacket->GetRecvCode(), -E_NOT_SUPPORT);
769     delete[] buffer;
770 }
771 
772 /**
773  * @tc.name: GetQueryWaterMark 001
774  * @tc.desc: Test metaData save and get queryWaterMark.
775  * @tc.type: FUNC
776  * @tc.require:
777  * @tc.author: zhangqiquan
778  */
779 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark001, TestSize.Level1)
780 {
781     VirtualSingleVerSyncDBInterface storage;
782     Metadata meta;
783 
784     /**
785      * @tc.steps: step1. initialize meta with storage
786      * @tc.expected: step1. E_OK
787      */
788     int errCode = meta.Initialize(&storage);
789     ASSERT_EQ(errCode, E_OK);
790 
791     /**
792      * @tc.steps: step2. save receive and send watermark
793      * @tc.expected: step2. E_OK
794      */
795     WaterMark w1 = 1;
796     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", "", w1), E_OK);
797     EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", "", w1), E_OK);
798 
799     /**
800      * @tc.steps: step3. get receive and send watermark
801      * @tc.expected: step3. E_OK and get the latest value
802      */
803     WaterMark w = 0;
804     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", "", w), E_OK);
805     EXPECT_EQ(w1, w);
806     EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", "", w), E_OK);
807     EXPECT_EQ(w1, w);
808 
809     /**
810      * @tc.steps: step4. set peer and local watermark
811      * @tc.expected: step4. E_OK
812      */
813     WaterMark w2 = 2;
814     EXPECT_EQ(meta.SaveLocalWaterMark("D1", "", w2), E_OK);
815     EXPECT_EQ(meta.SavePeerWaterMark("D1", "", w2, true), E_OK);
816 
817     /**
818      * @tc.steps: step5. get receive and send watermark
819      * @tc.expected: step5. E_OK and get the w1
820      */
821     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", "", w), E_OK);
822     EXPECT_EQ(w2, w);
823     EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", "", w), E_OK);
824     EXPECT_EQ(w2, w);
825 
826     /**
827      * @tc.steps: step6. set peer and local watermark
828      * @tc.expected: step6. E_OK
829      */
830     WaterMark w3 = 3;
831     EXPECT_EQ(meta.SaveLocalWaterMark("D2", "", w3), E_OK);
832     EXPECT_EQ(meta.SavePeerWaterMark("D2", "", w3, true), E_OK);
833 
834     /**
835      * @tc.steps: step7. get receive and send watermark
836      * @tc.expected: step7. E_OK and get the w3
837      */
838     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q2", "D2", "", w), E_OK);
839     EXPECT_EQ(w3, w);
840     EXPECT_EQ(meta.GetSendQueryWaterMark("Q2", "D2", "", w), E_OK);
841     EXPECT_EQ(w3, w);
842 
843     /**
844      * @tc.steps: step8. get not exit receive and send watermark
845      * @tc.expected: step8. E_OK and get the 0
846      */
847     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q3", "D3", "", w), E_OK);
848     EXPECT_EQ(w, 0u);
849     EXPECT_EQ(meta.GetSendQueryWaterMark("Q3", "D3", "", w), E_OK);
850     EXPECT_EQ(w, 0u);
851 }
852 
853 /**
854  * @tc.name: GetQueryWaterMark 002
855  * @tc.desc: Test metaData save and get queryWaterMark after push or pull mode.
856  * @tc.type: FUNC
857  * @tc.require:
858  * @tc.author: zhangqiquan
859  */
860 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark002, TestSize.Level1)
861 {
862     VirtualSingleVerSyncDBInterface storage;
863     Metadata meta;
864 
865     /**
866      * @tc.steps: step1. initialize meta with storage
867      * @tc.expected: step1. E_OK
868      */
869     int errCode = meta.Initialize(&storage);
870     ASSERT_EQ(errCode, E_OK);
871 
872     /**
873      * @tc.steps: step2. set peer and local watermark
874      * @tc.expected: step2. E_OK
875      */
876     WaterMark w1 = 2;
877     EXPECT_EQ(meta.SaveLocalWaterMark("D1", "", w1), E_OK);
878     EXPECT_EQ(meta.SavePeerWaterMark("D1", "", w1, true), E_OK);
879 
880     /**
881      * @tc.steps: step2. save receive and send watermark
882      * @tc.expected: step2. E_OK
883      */
884     WaterMark w2 = 1;
885     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", "", w2), E_OK);
886     EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", "", w2), E_OK);
887 
888     /**
889      * @tc.steps: step3. get receive and send watermark
890      * @tc.expected: step3. E_OK and get the bigger value
891      */
892     WaterMark w = 0;
893     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", "", w), E_OK);
894     EXPECT_EQ(w1, w);
895     EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", "", w), E_OK);
896     EXPECT_EQ(w1, w);
897 }
898 
899 /**
900  * @tc.name: GetQueryWaterMark 003
901  * @tc.desc: check time offset after remove water mark
902  * @tc.type: FUNC
903  * @tc.require:
904  * @tc.author: lianhuix
905  */
906 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark003, TestSize.Level1)
907 {
908     VirtualSingleVerSyncDBInterface storage;
909     Metadata meta;
910 
911     int errCode = meta.Initialize(&storage);
912     ASSERT_EQ(errCode, E_OK);
913 
914     const std::string DEVICE_B = "DEVICE_B";
915     TimeOffset offset = 100; // 100: offset
916     meta.SaveTimeOffset(DEVICE_B, "", offset);
917 
918     WaterMark w1 = 2; // 2: watermark
919     meta.SavePeerWaterMark(DBCommon::TransferHashString(DEVICE_B), "", w1, false);
920 
921     TimeOffset offsetGot;
922     meta.GetTimeOffset(DEVICE_B, "", offsetGot);
923     EXPECT_EQ(offsetGot, offset);
924 }
925 
926 /**
927  * @tc.name: GetDeleteWaterMark001
928  * @tc.desc: Test metaData save and get deleteWaterMark.
929  * @tc.type: FUNC
930  * @tc.require:
931  * @tc.author: zhangqiquan
932  */
933 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteWaterMark001, TestSize.Level1)
934 {
935     VirtualSingleVerSyncDBInterface storage;
936     Metadata meta;
937 
938     /**
939      * @tc.steps: step1. initialize meta with storage
940      * @tc.expected: step1. E_OK
941      */
942     int errCode = meta.Initialize(&storage);
943     ASSERT_EQ(errCode, E_OK);
944 
945     /**
946      * @tc.steps: step2. set and get recv/send delete watermark
947      * @tc.expected: step2. set E_OK and get water mark is equal with last set
948      */
949     const std::string device = "DEVICE";
950     const WaterMark maxWaterMark = 1000u;
__anonfb641fa10202() 951     std::thread recvThread([&meta, &device, &maxWaterMark]() {
952         for (WaterMark expectRecv = 0u; expectRecv < maxWaterMark; ++expectRecv) {
953             EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark(device, "", expectRecv), E_OK);
954             WaterMark actualRecv = 0u;
955             EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark(device, "", actualRecv), E_OK);
956             EXPECT_EQ(actualRecv, expectRecv);
957         }
958     });
__anonfb641fa10302() 959     std::thread sendThread([&meta, &device, &maxWaterMark]() {
960         for (WaterMark expectSend = 0u; expectSend < maxWaterMark; ++expectSend) {
961             EXPECT_EQ(meta.SetSendDeleteSyncWaterMark(device, "", expectSend), E_OK);
962             WaterMark actualSend = 0u;
963             EXPECT_EQ(meta.GetSendDeleteSyncWaterMark(device, "", actualSend), E_OK);
964             EXPECT_EQ(actualSend, expectSend);
965         }
966     });
967     recvThread.join();
968     sendThread.join();
969 }
970 
971 /**
972  * @tc.name: ClearQueryWaterMark 001
973  * @tc.desc: Test metaData clear watermark function.
974  * @tc.type: FUNC
975  * @tc.require:
976  * @tc.author: zhangqiquan
977  */
978 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark001, TestSize.Level1)
979 {
980     VirtualSingleVerSyncDBInterface storage;
981     Metadata meta;
982 
983     /**
984      * @tc.steps: step1. initialize meta with storage
985      * @tc.expected: step1. E_OK
986      */
987     int errCode = meta.Initialize(&storage);
988     ASSERT_EQ(errCode, E_OK);
989 
990     /**
991      * @tc.steps: step2. save receive watermark
992      * @tc.expected: step2. E_OK
993      */
994     WaterMark w1 = 1;
995     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", "", w1), E_OK);
996 
997     /**
998      * @tc.steps: step3. erase peer watermark
999      * @tc.expected: step3. E_OK
1000      */
1001     EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
1002 
1003     /**
1004      * @tc.steps: step4. get receive watermark
1005      * @tc.expected: step4. E_OK receive watermark is zero
1006      */
1007     WaterMark w2 = -1;
1008     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", "", w2), E_OK);
1009     EXPECT_EQ(w2, 0u);
1010 
1011     /**
1012      * @tc.steps: step5. set peer watermark
1013      * @tc.expected: step5. E_OK
1014      */
1015     WaterMark w3 = 2;
1016     EXPECT_EQ(meta.SavePeerWaterMark("D1", "", w3, true), E_OK);
1017 
1018     /**
1019      * @tc.steps: step6. get receive watermark
1020      * @tc.expected: step6. E_OK receive watermark is peer watermark
1021      */
1022     WaterMark w4 = -1;
1023     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", "", w4), E_OK);
1024     EXPECT_EQ(w4, w3);
1025 }
1026 
1027 /**
1028  * @tc.name: ClearQueryWaterMark 002
1029  * @tc.desc: Test metaData clear watermark function.
1030  * @tc.type: FUNC
1031  * @tc.require:
1032  * @tc.author: zhangqiquan
1033  */
1034 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark002, TestSize.Level1)
1035 {
1036     VirtualSingleVerSyncDBInterface storage;
1037     Metadata meta;
1038 
1039     /**
1040      * @tc.steps: step1. initialize meta with storage
1041      * @tc.expected: step1. E_OK
1042      */
1043     int errCode = meta.Initialize(&storage);
1044     ASSERT_EQ(errCode, E_OK);
1045 
1046     /**
1047      * @tc.steps: step2. save receive watermark
1048      * @tc.expected: step2. E_OK
1049      */
1050     WaterMark w1 = 1;
1051     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", "", w1), E_OK);
1052     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q2", "D1", "", w1), E_OK);
1053     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D2", "", w1), E_OK);
1054 
1055     /**
1056      * @tc.steps: step3. erase peer watermark, make sure data remove in db
1057      * @tc.expected: step3. E_OK
1058      */
1059     Metadata anotherMeta;
1060     ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1061     EXPECT_EQ(anotherMeta.EraseDeviceWaterMark("D1", true), E_OK);
1062 
1063     /**
1064      * @tc.steps: step4. get receive watermark
1065      * @tc.expected: step4. E_OK receive watermark is zero
1066      */
1067     WaterMark w2 = -1;
1068     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D1", "", w2), E_OK);
1069     EXPECT_EQ(w2, 0u);
1070     w2 = -1;
1071     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q2", "D1", "", w2), E_OK);
1072     EXPECT_EQ(w2, 0u);
1073     w2 = -1;
1074     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D2", "", w2), E_OK);
1075     EXPECT_EQ(w2, w1);
1076 }
1077 
1078 /**
1079  * @tc.name: ClearQueryWaterMark 003
1080  * @tc.desc: Test metaData clear watermark busy.
1081  * @tc.type: FUNC
1082  * @tc.require:
1083  * @tc.author: zhangqiquan
1084  */
1085 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark003, TestSize.Level1)
1086 {
1087     VirtualSingleVerSyncDBInterface storage;
1088     Metadata meta;
1089     /**
1090      * @tc.steps: step1. initialize meta with storage
1091      * @tc.expected: step1. E_OK
1092      */
1093     int errCode = meta.Initialize(&storage);
1094     ASSERT_EQ(errCode, E_OK);
1095     /**
1096      * @tc.steps: step2. set busy and erase water mark
1097      * @tc.expected: step2. -E_BUSY
1098      */
1099     storage.SetBusy(false, true);
1100     EXPECT_EQ(meta.EraseDeviceWaterMark("DEVICE_ID", true), -E_BUSY);
1101 }
1102 
1103 /**
1104  * @tc.name: GetQueryLastTimestamp001
1105  * @tc.desc: Test function of GetQueryLastTimestamp.
1106  * @tc.type: FUNC
1107  * @tc.require:
1108  * @tc.author: zhangshijie
1109  */
1110 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryLastTimestamp001, TestSize.Level1)
1111 {
1112     /**
1113      * @tc.steps: step1. initialize meta with nullptr
1114      * @tc.expected: step1. return -E_INVALID_DB
1115      */
1116     Metadata meta;
1117     EXPECT_EQ(meta.Initialize(nullptr), -E_INVALID_DB);
1118 
1119     /**
1120      * @tc.steps: step2. initialize meta with storage
1121      * @tc.expected: step2. E_OK
1122      */
1123     VirtualSingleVerSyncDBInterface storage;
1124     int errCode = meta.Initialize(&storage);
1125     ASSERT_EQ(errCode, E_OK);
1126 
1127     /**
1128      * @tc.steps: step3. call GetQueryLastTimestamp with a non-exists device
1129      * @tc.expected: step3. return INT64_MAX
1130      */
1131     EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), static_cast<uint64_t>(INT64_MAX));
1132 
1133     /**
1134      * @tc.steps: step4. call GetQueryLastTimestamp with device D1 again
1135      * @tc.expected: step4. return 0
1136      */
1137     EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), 0u);
1138 
1139     /**
1140      * @tc.steps: step5. call GetQueryLastTimestamp with device D1 and Q2
1141      * @tc.expected: step5. return INT64_MAX
1142      */
1143     EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q2"), static_cast<uint64_t>(INT64_MAX));
1144 }
1145 
1146 /**
1147  * @tc.name: GetQueryLastTimestamp002
1148  * @tc.desc: Test Metadata::GetQueryLastTimestamp when timestamp out of INT64 range.
1149  * @tc.type: FUNC
1150  * @tc.author: liuhongyang
1151  */
1152 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryLastTimestamp002, TestSize.Level1)
1153 {
1154     /**
1155      * @tc.steps: step1. initialize storage with fake meta data
1156      * @tc.expected: step1. E_OK
1157      */
1158     const int64_t validInt64Num1 = 3000000000000000000; // a random valid int64 number
1159     const int64_t validInt64Num2 = 3000000000000000001; // a random valid int64 number
1160     // key is queryId, value is: [stored timestamp in db, expect return value of GetQueryLastTimestamp]
1161     std::map<std::string, std::pair<std::string, int64_t>> idValueMap = {
1162         {"regular1", {"3000000000000000000", validInt64Num1}},
1163         {"max", {"9223372036854775807", INT64_MAX}},
1164         {"min", {"-9223372036854775808", INT64_MIN}},
1165         {"overMax", {"9223372036854775808", INT64_MAX}},
1166         {"underMin", {"-9223372036854775809", INT64_MIN}},
1167         {"regular2", {"3000000000000000001", validInt64Num2}}};
1168     Key metaKey;
1169     Value value;
1170     VirtualSingleVerSyncDBInterface storage;
1171     for (auto &pair : idValueMap) {
1172         std::string keyStr = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(pair.first);
1173         DBCommon::StringToVector(keyStr, metaKey);
1174         DBCommon::StringToVector(pair.second.first, value);
1175         EXPECT_EQ(storage.PutMetaData(metaKey, value, false), E_OK);
1176     }
1177     /**
1178      * @tc.steps: step2. call GetQueryLastTimestamp with different query id
1179      * @tc.expected: step2. get the correct return value
1180      */
1181     Metadata meta;
1182     int errCode = meta.Initialize(&storage);
1183     ASSERT_EQ(errCode, E_OK);
1184     for (auto &pair : idValueMap) {
1185         auto &queryId = pair.first;
1186         auto &expectVal = pair.second.second;
1187         EXPECT_EQ(meta.GetQueryLastTimestamp("any", queryId), static_cast<uint64_t>(expectVal));
1188     }
1189 }
1190 
1191 /**
1192  * @tc.name: MetaDataExceptionBranch001
1193  * @tc.desc: Test execption branch of meata data.
1194  * @tc.type: FUNC
1195  * @tc.require:
1196  * @tc.author: zhangshijie
1197  */
1198 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, MetaDataExceptionBranch001, TestSize.Level1)
1199 {
1200     /**
1201      * @tc.steps: step1. call GetRemoveDataMark with a device not in map
1202      * @tc.expected: step1. out value = 0
1203      */
1204     Metadata meta;
1205     VirtualSingleVerSyncDBInterface storage;
1206     int errCode = meta.Initialize(&storage);
1207     ASSERT_EQ(errCode, E_OK);
1208 
1209     uint64_t val = 99; // 99 is the initial value of outValue
1210     uint64_t outValue = val;
1211     meta.GetRemoveDataMark("D1", "", outValue);
1212     EXPECT_EQ(outValue, 0u);
1213 
1214     /**
1215      * @tc.steps: step2. reset outValue, call GetDbCreateTime with a device not in map
1216      * @tc.expected: step2. out value = 0
1217      */
1218     outValue = val;
1219     meta.GetDbCreateTime("D1", "", outValue);
1220     EXPECT_EQ(outValue, 0u);
1221 
1222     /**
1223      * @tc.steps: step3. call ResetMetaDataAfterRemoveData with a device not in map
1224      * @tc.expected: step3. return -E_NOT_FOUND
1225      */
1226     EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1", ""), -E_NOT_FOUND);
1227 }
1228 
1229 /**
1230  * @tc.name: GetDeleteKeyWaterMark 001
1231  * @tc.desc: Test metaData save and get deleteWaterMark.
1232  * @tc.type: FUNC
1233  * @tc.require:
1234  * @tc.author: zhangqiquan
1235  */
1236 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark001, TestSize.Level1)
1237 {
1238     VirtualSingleVerSyncDBInterface storage;
1239     Metadata meta;
1240 
1241     /**
1242      * @tc.steps: step1. initialize meta with storage
1243      * @tc.expected: step1. E_OK
1244      */
1245     int errCode = meta.Initialize(&storage);
1246     ASSERT_EQ(errCode, E_OK);
1247 
1248     /**
1249      * @tc.steps: step2. save receive and send watermark
1250      * @tc.expected: step2. E_OK
1251      */
1252     WaterMark w1 = 1;
1253     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", "", w1), E_OK);
1254     EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", "", w1), E_OK);
1255 
1256     /**
1257      * @tc.steps: step3. get receive and send watermark
1258      * @tc.expected: step3. E_OK and get the latest value
1259      */
1260     WaterMark w = 0;
1261     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", "", w), E_OK);
1262     EXPECT_EQ(w1, w);
1263     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", "", w), E_OK);
1264     EXPECT_EQ(w1, w);
1265 
1266     /**
1267      * @tc.steps: step4. set peer and local watermark
1268      * @tc.expected: step4. E_OK
1269      */
1270     WaterMark w2 = 2;
1271     EXPECT_EQ(meta.SaveLocalWaterMark("D1", "", w2), E_OK);
1272     EXPECT_EQ(meta.SavePeerWaterMark("D1", "", w2, true), E_OK);
1273 
1274     /**
1275      * @tc.steps: step5. get receive and send watermark
1276      * @tc.expected: step5. E_OK and get the w2
1277      */
1278     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", "", w), E_OK);
1279     EXPECT_EQ(w2, w);
1280     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", "", w), E_OK);
1281     EXPECT_EQ(w2, w);
1282 
1283     /**
1284      * @tc.steps: step6. set peer and local watermark
1285      * @tc.expected: step6. E_OK
1286      */
1287     WaterMark w3 = 3;
1288     EXPECT_EQ(meta.SaveLocalWaterMark("D2", "", w3), E_OK);
1289     EXPECT_EQ(meta.SavePeerWaterMark("D2", "", w3, true), E_OK);
1290 
1291     /**
1292      * @tc.steps: step7. get receive and send watermark
1293      * @tc.expected: step7. E_OK and get the w3
1294      */
1295     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D2", "", w), E_OK);
1296     EXPECT_EQ(w3, w);
1297     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D2", "", w), E_OK);
1298     EXPECT_EQ(w3, w);
1299 
1300     /**
1301      * @tc.steps: step8. get not exit receive and send watermark
1302      * @tc.expected: step8. E_OK and get the 0
1303      */
1304     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D3", "", w), E_OK);
1305     EXPECT_EQ(w, 0u);
1306     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D3", "", w), E_OK);
1307     EXPECT_EQ(w, 0u);
1308 }
1309 
1310 /**
1311  * @tc.name: GetDeleteKeyWaterMark 002
1312  * @tc.desc: Test metaData save and get deleteWaterMark after push or pull mode.
1313  * @tc.type: FUNC
1314  * @tc.require:
1315  * @tc.author: zhangqiquan
1316  */
1317 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark002, TestSize.Level1)
1318 {
1319     VirtualSingleVerSyncDBInterface storage;
1320     Metadata meta;
1321 
1322     /**
1323      * @tc.steps: step1. initialize meta with storage
1324      * @tc.expected: step1. E_OK
1325      */
1326     int errCode = meta.Initialize(&storage);
1327     ASSERT_EQ(errCode, E_OK);
1328 
1329     /**
1330      * @tc.steps: step2. set peer and local watermark
1331      * @tc.expected: step2. E_OK
1332      */
1333     WaterMark w1 = 3;
1334     EXPECT_EQ(meta.SaveLocalWaterMark("D1", "", w1), E_OK);
1335     EXPECT_EQ(meta.SavePeerWaterMark("D1", "", w1, true), E_OK);
1336 
1337     /**
1338      * @tc.steps: step2. save receive and send watermark
1339      * @tc.expected: step2. E_OK
1340      */
1341     WaterMark w2 = 1;
1342     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", "", w2), E_OK);
1343     EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", "", w2), E_OK);
1344 
1345     /**
1346      * @tc.steps: step3. get receive and send watermark
1347      * @tc.expected: step3. E_OK and get the bigger value
1348      */
1349     WaterMark w = 0;
1350     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", "", w), E_OK);
1351     EXPECT_EQ(w1, w);
1352     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", "", w), E_OK);
1353     EXPECT_EQ(w1, w);
1354 }
1355 
1356 /**
1357  * @tc.name: ClearDeleteKeyWaterMark 001
1358  * @tc.desc: Test metaData clear watermark function.
1359  * @tc.type: FUNC
1360  * @tc.require:
1361  * @tc.author: zhangqiquan
1362  */
1363 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearDeleteKeyWaterMark001, TestSize.Level1)
1364 {
1365     VirtualSingleVerSyncDBInterface storage;
1366     Metadata meta;
1367 
1368     /**
1369      * @tc.steps: step1. initialize meta with storage
1370      * @tc.expected: step1. E_OK
1371      */
1372     int errCode = meta.Initialize(&storage);
1373     ASSERT_EQ(errCode, E_OK);
1374 
1375     /**
1376      * @tc.steps: step2. save receive watermark
1377      * @tc.expected: step2. E_OK
1378      */
1379     WaterMark w1 = 1;
1380     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", "", w1), E_OK);
1381 
1382     /**
1383      * @tc.steps: step3. erase peer watermark
1384      * @tc.expected: step3. E_OK
1385      */
1386     EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
1387 
1388     /**
1389      * @tc.steps: step4. get receive watermark
1390      * @tc.expected: step4. E_OK receive watermark is zero
1391      */
1392     WaterMark w2 = -1;
1393     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", "", w2), E_OK);
1394     EXPECT_EQ(w2, 0u);
1395 
1396     /**
1397      * @tc.steps: step5. set peer watermark
1398      * @tc.expected: step5. E_OK
1399      */
1400     WaterMark w3 = 2;
1401     EXPECT_EQ(meta.SavePeerWaterMark("D1", "", w3, true), E_OK);
1402 
1403     /**
1404      * @tc.steps: step6. get receive watermark
1405      * @tc.expected: step6. E_OK receive watermark is peer watermark
1406      */
1407     WaterMark w4 = -1;
1408     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", "", w4), E_OK);
1409     EXPECT_EQ(w4, w3);
1410 }
1411 
1412 /**
1413  * @tc.name: VerifyCacheAndDb 001
1414  * @tc.desc: Test metaData watermark cache and db are consistent and correct.
1415  * @tc.type: FUNC
1416  * @tc.require:
1417  * @tc.author: zhangqiquan
1418  */
1419 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataQuerySync001, TestSize.Level1)
1420 {
1421     Metadata meta;
1422     VirtualSingleVerSyncDBInterface storage;
1423 
1424     /**
1425      * @tc.steps: step1. initialize meta with storage
1426      * @tc.expected: step1. E_OK
1427      */
1428     int errCode = meta.Initialize(&storage);
1429     ASSERT_EQ(errCode, E_OK);
1430 
1431     const std::string deviceId = "D1";
1432     const std::string queryId = "Q1";
1433 
1434     /**
1435     * @tc.steps: step2. save deleteSync watermark
1436     * @tc.expected: step2. E_OK
1437     */
1438     WaterMark deleteWaterMark = 1;
1439     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark(deviceId, "", deleteWaterMark), E_OK);
1440     EXPECT_EQ(meta.SetSendDeleteSyncWaterMark(deviceId, "", deleteWaterMark), E_OK);
1441 
1442     /**
1443     * @tc.steps: step3. save querySync watermark
1444     * @tc.expected: step2. E_OK
1445     */
1446     WaterMark queryWaterMark = 2;
1447     EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, "", queryWaterMark), E_OK);
1448     EXPECT_EQ(meta.SetSendQueryWaterMark(queryId, deviceId, "", queryWaterMark), E_OK);
1449 
1450     /**
1451     * @tc.steps: step4. initialize meta with storage
1452     * @tc.expected: step4. E_OK
1453     */
1454     Metadata anotherMeta;
1455     ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1456 
1457     /**
1458     * @tc.steps: step5. verify delete sync data
1459     * @tc.expected: step5. E_OK and waterMark equal to deleteWaterMark
1460     */
1461     WaterMark waterMark;
1462     EXPECT_EQ(anotherMeta.GetRecvDeleteSyncWaterMark(deviceId, "", waterMark), E_OK);
1463     EXPECT_EQ(waterMark, deleteWaterMark);
1464     EXPECT_EQ(anotherMeta.GetSendDeleteSyncWaterMark(deviceId, "", waterMark), E_OK);
1465     EXPECT_EQ(waterMark, deleteWaterMark);
1466 
1467     /**
1468     * @tc.steps: step6. verify query sync data
1469     * @tc.expected: step6. E_OK and waterMark equal to queryWaterMark
1470     */
1471     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark(queryId, deviceId, "", waterMark), E_OK);
1472     EXPECT_EQ(waterMark, queryWaterMark);
1473     EXPECT_EQ(anotherMeta.GetSendQueryWaterMark(queryId, deviceId, "", waterMark), E_OK);
1474     EXPECT_EQ(waterMark, queryWaterMark);
1475 }
1476 
1477 /**
1478  * @tc.name: VerifyLruMap 001
1479  * @tc.desc: Test metaData watermark cache lru ability.
1480  * @tc.type: FUNC
1481  * @tc.require:
1482  * @tc.author: zhangqiquan
1483  */
1484 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyLruMap001, TestSize.Level1)
1485 {
1486     LruMap<std::string, QueryWaterMark> lruMap;
1487     const int maxCacheItems = 200;
1488 
1489     /**
1490     * @tc.steps: step1. fill items to LruMap
1491     * @tc.expected: step1. E_OK
1492     */
1493     const int startCount = 0;
1494     for (int i = startCount; i < maxCacheItems; i++) {
1495         std::string key = std::to_string(i);
1496         QueryWaterMark value;
1497         value.recvWaterMark = static_cast<uint64_t>(i + 1);
1498         EXPECT_EQ(lruMap.Put(key, value), E_OK);
1499     }
1500 
1501     /**
1502     * @tc.steps: step2. get the first item
1503     * @tc.expected: step2. E_OK first item will move to last
1504     */
1505     std::string firstItemKey = std::to_string(startCount);
1506     QueryWaterMark firstItemValue;
1507     EXPECT_EQ(lruMap.Get(firstItemKey, firstItemValue), E_OK);
1508     EXPECT_EQ(firstItemValue.recvWaterMark, 1u);
1509 
1510     /**
1511     * @tc.steps: step3. insert new items to LruMap
1512     * @tc.expected: step3. the second items was removed
1513     */
1514     std::string key = std::to_string(maxCacheItems);
1515     QueryWaterMark value;
1516     value.recvWaterMark = maxCacheItems;
1517     EXPECT_EQ(lruMap.Put(key, value), E_OK);
1518 
1519     /**
1520     * @tc.steps: step4. get the second item
1521     * @tc.expected: step4. E_NOT_FOUND it was removed by algorithm
1522     */
1523     std::string secondItemKey = std::to_string(startCount + 1);
1524     QueryWaterMark secondItemValue;
1525     EXPECT_EQ(lruMap.Get(secondItemKey, secondItemValue), -E_NOT_FOUND);
1526     EXPECT_EQ(secondItemValue.recvWaterMark, 0u);
1527 }
1528 
1529 /**
1530  * @tc.name: VerifyMetaDataInit 001
1531  * @tc.desc: Test metaData init correctly
1532  * @tc.type: FUNC
1533  * @tc.require:
1534  * @tc.author: zhangqiquan
1535  */
1536 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataInit001, TestSize.Level1)
1537 {
1538     Metadata meta;
1539     VirtualSingleVerSyncDBInterface storage;
1540 
1541     /**
1542     * @tc.steps: step1. initialize meta with storage
1543     * @tc.expected: step1. E_OK
1544     */
1545     ASSERT_EQ(meta.Initialize(&storage), E_OK);
1546 
1547     DeviceID deviceA = "DeviceA";
1548     DeviceID deviceB = "DeviceA";
1549     WaterMark setWaterMark = 1;
1550 
1551     /**
1552     * @tc.steps: step2. meta save and get waterMark
1553     * @tc.expected: step2. expect get the same waterMark
1554     */
1555     EXPECT_EQ(meta.SaveLocalWaterMark(deviceA, "", setWaterMark), E_OK);
1556     EXPECT_EQ(meta.SaveLocalWaterMark(deviceB, "", setWaterMark), E_OK);
1557     WaterMark getWaterMark = 0;
1558     meta.GetLocalWaterMark(deviceA, "", getWaterMark);
1559     EXPECT_EQ(getWaterMark, setWaterMark);
1560     meta.GetLocalWaterMark(deviceB, "", getWaterMark);
1561     EXPECT_EQ(getWaterMark, setWaterMark);
1562 
1563 
1564     /**
1565     * @tc.steps: step3. init again
1566     * @tc.expected: step3. E_OK
1567     */
1568     Metadata anotherMeta;
1569     ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1570 
1571     /**
1572     * @tc.steps: step4. get waterMark again
1573     * @tc.expected: step4. expect get the same waterMark
1574     */
1575     anotherMeta.GetLocalWaterMark(deviceA, "", getWaterMark);
1576     EXPECT_EQ(getWaterMark, setWaterMark);
1577     anotherMeta.GetLocalWaterMark(deviceB, "", getWaterMark);
1578     EXPECT_EQ(getWaterMark, setWaterMark);
1579 }
1580 
1581 namespace {
InitVerifyStorageEnvironment(Metadata & meta,VirtualSingleVerSyncDBInterface & storage,const std::string & deviceId,const int & startCount,const uint32_t & maxStoreItems)1582 void InitVerifyStorageEnvironment(Metadata &meta, VirtualSingleVerSyncDBInterface &storage,
1583     const std::string &deviceId, const int &startCount, const uint32_t &maxStoreItems)
1584 {
1585     /**
1586     * @tc.steps: step1. initialize meta with storage
1587     * @tc.expected: step1. E_OK
1588     */
1589     ASSERT_EQ(meta.Initialize(&storage), E_OK);
1590 
1591     /**
1592     * @tc.steps: step2. fill items to metadata
1593     * @tc.expected: step2. E_OK
1594     */
1595     for (uint32_t i = startCount; i < maxStoreItems; i++) {
1596         std::string queryId = std::to_string(i);
1597         WaterMark recvWaterMark = i + 1;
1598         EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, "", recvWaterMark), E_OK);
1599     }
1600 }
1601 }
1602 
1603 /**
1604  * @tc.name: VerifyManagerQuerySyncStorage 001
1605  * @tc.desc: Test metaData remove least used querySync storage items.
1606  * @tc.type: FUNC
1607  * @tc.require:
1608  * @tc.author: zhangqiquan
1609  */
1610 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage001, TestSize.Level3)
1611 {
1612     Metadata meta;
1613     VirtualSingleVerSyncDBInterface storage;
1614     const uint32_t maxStoreItems = 100000;
1615     const int startCount = 0;
1616     const std::string deviceId = "Device";
1617 
1618     InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1619 
1620     /**
1621     * @tc.steps: step3. insert new items to metadata
1622     * @tc.expected: step3. E_OK
1623     */
1624     std::string newQueryId = std::to_string(maxStoreItems);
1625     WaterMark newWaterMark = maxStoreItems + 1;
1626     EXPECT_EQ(meta.SetRecvQueryWaterMark(newQueryId, deviceId, "", newWaterMark), E_OK);
1627 
1628     /**
1629     * @tc.steps: step4. touch the first item
1630     * @tc.expected: step4. E_OK update first item used time
1631     */
1632     std::string firstItemKey = std::to_string(startCount);
1633     WaterMark firstWaterMark = 11u;
1634     EXPECT_EQ(meta.SetRecvQueryWaterMark(firstItemKey, deviceId, "", firstWaterMark), E_OK);
1635 
1636     /**
1637     * @tc.steps: step5. initialize new meta with storage
1638     * @tc.expected: step5. the second item will be removed
1639     */
1640     Metadata newMeta;
1641     ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1642 
1643     /**
1644     * @tc.steps: step6. touch the first item
1645     * @tc.expected: step6. E_OK it still exist
1646     */
1647     WaterMark exceptWaterMark;
1648     EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, "", exceptWaterMark), E_OK);
1649     EXPECT_EQ(exceptWaterMark, firstWaterMark);
1650 
1651     /**
1652     * @tc.steps: step7. get the second item
1653     * @tc.expected: step7. NOT_FOUND secondWaterMark is zero
1654     */
1655     WaterMark secondWaterMark;
1656     std::string secondQueryId = std::to_string(startCount + 1);
1657     EXPECT_EQ(newMeta.GetRecvQueryWaterMark(secondQueryId, deviceId, "", secondWaterMark), E_OK);
1658     EXPECT_EQ(secondWaterMark, 0u);
1659 }
1660 
1661 /**
1662  * @tc.name: VerifyMetaDbCreateTime 001
1663  * @tc.desc: Test metaData get and set cbCreateTime.
1664  * @tc.type: FUNC
1665  * @tc.require:
1666  * @tc.author: zhuwentao
1667  */
1668 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDbCreateTime001, TestSize.Level1)
1669 {
1670     Metadata meta;
1671     VirtualSingleVerSyncDBInterface storage;
1672     /**
1673      * @tc.steps: step1. initialize meta with storage
1674      * @tc.expected: step1. E_OK
1675      */
1676     int errCode = meta.Initialize(&storage);
1677     ASSERT_EQ(errCode, E_OK);
1678     /**
1679      * @tc.steps: step2. set local and peer watermark and dbCreateTime
1680      * @tc.expected: step4. E_OK
1681      */
1682     WaterMark value = 2;
1683     EXPECT_EQ(meta.SaveLocalWaterMark("D1", "", value), E_OK);
1684     EXPECT_EQ(meta.SavePeerWaterMark("D1", "", value, true), E_OK);
1685     EXPECT_EQ(meta.SetDbCreateTime("D1", "", 10u, true), E_OK);
1686     /**
1687      * @tc.steps: step3. check peer and local watermark and dbCreateTime
1688      * @tc.expected: step4. E_OK
1689      */
1690     WaterMark curValue = 0;
1691     meta.GetLocalWaterMark("D1", "", curValue);
1692     EXPECT_EQ(value, curValue);
1693     meta.GetPeerWaterMark("D1", "", curValue);
1694     EXPECT_EQ(value, curValue);
1695     uint64_t curDbCreatTime = 0;
1696     meta.GetDbCreateTime("D1", "", curDbCreatTime);
1697     EXPECT_EQ(curDbCreatTime, 10u);
1698     /**
1699      * @tc.steps: step3. change dbCreateTime and check
1700      * @tc.expected: step4. E_OK
1701      */
1702     EXPECT_EQ(meta.SetDbCreateTime("D1", "", 20u, true), E_OK);
1703     uint64_t clearDeviceDataMark = INT_MAX;
1704     meta.GetRemoveDataMark("D1", "", clearDeviceDataMark);
1705     EXPECT_EQ(clearDeviceDataMark, 1u);
1706     EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1", ""), E_OK);
1707     meta.GetRemoveDataMark("D1", "", clearDeviceDataMark);
1708     EXPECT_EQ(clearDeviceDataMark, 0u);
1709     meta.GetDbCreateTime("D1", "", curDbCreatTime);
1710     EXPECT_EQ(curDbCreatTime, 20u);
1711 }
1712 
1713 /**
1714  * @tc.name: VerifyManagerQuerySyncStorage 002
1715  * @tc.desc: Test metaData remove least used querySync storage items when exit wrong data.
1716  * @tc.type: FUNC
1717  * @tc.require:
1718  * @tc.author: zhangqiquan
1719  */
1720 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage002, TestSize.Level3)
1721 {
1722     Metadata meta;
1723     VirtualSingleVerSyncDBInterface storage;
1724     const uint32_t maxStoreItems = 100000;
1725     const int startCount = 0;
1726     const std::string deviceId = "Device";
1727 
1728     InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1729 
1730     /**
1731     * @tc.steps: step3. insert a wrong Value
1732     * @tc.expected: step3. E_OK
1733     */
1734     std::string newQueryId = std::to_string(maxStoreItems);
1735     Key dbKey;
1736     DBCommon::StringToVector(QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
1737         + DBCommon::TransferHashString(deviceId) + newQueryId, dbKey);
1738     Value wrongValue;
1739     EXPECT_EQ(storage.PutMetaData(dbKey, wrongValue, false), E_OK);
1740 
1741     /**
1742     * @tc.steps: step4. initialize new meta with storage
1743     * @tc.expected: step4. E_OK
1744     */
1745     Metadata newMeta;
1746     ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1747 
1748     /**
1749     * @tc.steps: step5. touch the first item
1750     * @tc.expected: step5. E_OK still exit
1751     */
1752     std::string firstItemKey = std::to_string(startCount);
1753     WaterMark exceptWaterMark;
1754     EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, "", exceptWaterMark), E_OK);
1755     EXPECT_EQ(exceptWaterMark, 1u);
1756 }
1757 
1758 /**
1759  * @tc.name: AllPredicateQuerySync001
1760  * @tc.desc: Test normal push sync for AllPredicate data.
1761  * @tc.type: FUNC
1762  * @tc.require:
1763  * @tc.author: zhuwentao
1764  */
1765 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync001, TestSize.Level1)
1766 {
1767     /**
1768      * @tc.steps: step1. InitSchemaDb
1769      */
1770     InitSchemaDb();
1771     DBStatus status = OK;
1772     std::vector<std::string> devices;
1773     devices.push_back(g_deviceB->GetDeviceId());
1774 
1775     /**
1776      * @tc.steps: step2. deviceA put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1777                          {key21, SCHEMA_VALUE2} - {key29, SCHEMA_VALUE2}
1778      */
1779     Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1780     Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1781     Key key = {'1'};
1782     Key key2 = {'2'};
1783     const int dataSize = 4000;
1784     for (int i = 0; i < dataSize; i++) {
1785         key.push_back(i);
1786         key2.push_back(i);
1787         status = g_schemaKvDelegatePtr->Put(key, value);
1788         ASSERT_TRUE(status == OK);
1789         status = g_schemaKvDelegatePtr->Put(key2, value2);
1790         ASSERT_TRUE(status == OK);
1791         key.pop_back();
1792         key2.pop_back();
1793     }
1794     ASSERT_TRUE(status == OK);
1795 
1796     /**
1797      * @tc.steps: step3. deviceA call query sync and wait
1798      * @tc.expected: step3. sync should return OK.
1799      */
1800     Query query = Query::Select().EqualTo("$.field_name1", 1);
1801     std::map<std::string, DBStatus> result;
1802     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1803     ASSERT_TRUE(status == OK);
1804 
1805     /**
1806      * @tc.expected: step4. onComplete should be called, DeviceB have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1807      */
1808     ASSERT_TRUE(result.size() == devices.size());
1809     for (const auto &pair : result) {
1810         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1811         EXPECT_TRUE(pair.second == OK);
1812     }
1813     VirtualDataItem item;
1814     VirtualDataItem item2;
1815     for (int i = 0; i < dataSize; i++) {
1816         key.push_back(i);
1817         key2.push_back(i);
1818         g_deviceB->GetData(key, item);
1819         EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1820         EXPECT_TRUE(item.value == value);
1821         key.pop_back();
1822         key2.pop_back();
1823     }
1824 }
1825 
1826 /**
1827  * @tc.name: AllPredicateQuerySync002
1828  * @tc.desc: Test wrong query param push sync for AllPredicate data.
1829  * @tc.type: FUNC
1830  * @tc.require:
1831  * @tc.author: zhuwentao
1832  */
1833 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync002, TestSize.Level1)
1834 {
1835     /**
1836      * @tc.steps: step1. InitSchemaDb
1837      */
1838     InitSchemaDb();
1839     DBStatus status = OK;
1840     std::vector<std::string> devices;
1841     devices.push_back(g_deviceB->GetDeviceId());
1842 
1843     /**
1844      * @tc.steps: step2. deviceA call query sync and wait
1845      * @tc.expected: step2. sync should return INVALID_QUERY_FIELD
1846      */
1847     Query query = Query::Select().GreaterThan("field_name11", 10);
1848     std::map<std::string, DBStatus> result;
1849     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1850     ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1851     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
1852     ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1853     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1854     ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1855 }
1856 
1857 /**
1858  * @tc.name: AllPredicateQuerySync003
1859  * @tc.desc: Test normal push sync for AllPredicate data with limit
1860  * @tc.type: FUNC
1861  * @tc.require:
1862  * @tc.author: zhuwentao
1863  */
1864 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync003, TestSize.Level1)
1865 {
1866     /**
1867      * @tc.steps: step1. InitSchemaDb
1868      */
1869     InitSchemaDb();
1870     DBStatus status = OK;
1871     std::vector<std::string> devices;
1872     devices.push_back(g_deviceB->GetDeviceId());
1873 
1874     /**
1875      * @tc.steps: step2. deviceA put {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1876      */
1877     Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1878     Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1879     Key key = {'1'};
1880     Key key2 = {'2'};
1881     const int dataSize = 10;
1882     for (int i = 0; i < dataSize; i++) {
1883         key.push_back(i);
1884         key2.push_back(i);
1885         status = g_schemaKvDelegatePtr->Put(key, value);
1886         ASSERT_TRUE(status == OK);
1887         status = g_schemaKvDelegatePtr->Put(key2, value2);
1888         ASSERT_TRUE(status == OK);
1889         key.pop_back();
1890         key2.pop_back();
1891     }
1892     ASSERT_TRUE(status == OK);
1893 
1894     /**
1895      * @tc.steps: step3. deviceA call query sync with limit and wait
1896      * @tc.expected: step3. sync should return OK.
1897      */
1898     Query query = Query::Select().EqualTo("$.field_name1", 1).Limit(20, 0);
1899     std::map<std::string, DBStatus> result;
1900     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1901     ASSERT_TRUE(status == OK);
1902 
1903     /**
1904      * @tc.expected: step4. onComplete should be called, DeviceB have {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1905      */
1906     ASSERT_TRUE(result.size() == devices.size());
1907     for (const auto &pair : result) {
1908         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1909         EXPECT_TRUE(pair.second == OK);
1910     }
1911     VirtualDataItem item;
1912     VirtualDataItem item2;
1913     for (int i = 0; i < dataSize; i++) {
1914         key.push_back(i);
1915         key2.push_back(i);
1916         g_deviceB->GetData(key, item);
1917         EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1918         EXPECT_TRUE(item.value == value);
1919         key.pop_back();
1920         key2.pop_back();
1921     }
1922 }
1923 
1924 /**
1925  * @tc.name: AllPredicateQuerySync004
1926  * @tc.desc: Test normal pull sync for AllPredicate data.
1927  * @tc.type: FUNC
1928  * @tc.require:
1929  * @tc.author: zhuwentao
1930  */
1931 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync004, TestSize.Level1)
1932 {
1933     /**
1934      * @tc.steps: step1. InitSchemaDb
1935      */
1936     InitSchemaDb();
1937     std::vector<std::string> devices;
1938     devices.push_back(g_deviceB->GetDeviceId());
1939 
1940     /**
1941      * @tc.steps: step2. deviceB put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1942      */
1943     Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1944     Key key = {'1'};
1945     const int dataSize = 10;
1946     for (int i = 0; i < dataSize; i++) {
1947         key.push_back(i);
1948         g_deviceB->PutData(key, value, 10 + i, 0);
1949         key.pop_back();
1950     }
1951 
1952     /**
1953      * @tc.steps: step3. deviceA call query sync and wait
1954      * @tc.expected: step3. sync should return OK.
1955      */
1956     Query query = Query::Select().EqualTo("$.field_name1", 1);
1957     std::map<std::string, DBStatus> result;
1958     DBStatus status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1959     ASSERT_TRUE(status == OK);
1960 
1961     /**
1962      * @tc.expected: step4. onComplete should be called, DeviceA have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1963      */
1964     ASSERT_TRUE(result.size() == devices.size());
1965     for (const auto &pair : result) {
1966         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1967         EXPECT_TRUE(pair.second == OK);
1968     }
1969     Value item;
1970     Value item2;
1971     for (int i = 0; i < dataSize; i++) {
1972         key.push_back(i);
1973         g_schemaKvDelegatePtr->Get(key, item);
1974         EXPECT_TRUE(item == value);
1975         key.pop_back();
1976     }
1977 }