1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <thread>
18
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "generic_single_ver_kv_entry.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "platform_specific.h"
27 #include "query.h"
28 #include "query_sync_object.h"
29 #include "single_ver_data_sync.h"
30 #include "single_ver_serialize_manager.h"
31 #include "sync_types.h"
32 #include "virtual_communicator.h"
33 #include "virtual_communicator_aggregator.h"
34 #include "virtual_single_ver_sync_db_Interface.h"
35
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40
41 namespace {
42 string g_testDir;
43 const string STORE_ID = "kv_store_sync_test";
44 const string SCHEMA_STORE_ID = "kv_store_sync_schema_test";
45 const std::string DEVICE_B = "deviceB";
46
47 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
48 KvStoreDelegateManager g_schemaMgr(SCHEMA_APP_ID, USER_ID);
49 KvStoreConfig g_config;
50 DistributedDBToolsUnitTest g_tool;
51 DBStatus g_kvDelegateStatus = INVALID_ARGS;
52 DBStatus g_schemaKvDelegateStatus = INVALID_ARGS;
53 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
54 KvStoreNbDelegate* g_schemaKvDelegatePtr = nullptr;
55 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
56 KvVirtualDevice *g_deviceB = nullptr;
57
58 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
59 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
60 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
61 auto g_schemaKvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
62 placeholders::_1, placeholders::_2, std::ref(g_schemaKvDelegateStatus), std::ref(g_schemaKvDelegatePtr));
63 const string SCHEMA_STRING =
64 "{\"SCHEMA_VERSION\":\"1.0\","
65 "\"SCHEMA_MODE\":\"STRICT\","
66 "\"SCHEMA_DEFINE\":{"
67 "\"field_name1\":\"BOOL\","
68 "\"field_name2\":\"BOOL\","
69 "\"field_name3\":\"INTEGER, NOT NULL\","
70 "\"field_name4\":\"LONG, DEFAULT 100\","
71 "\"field_name5\":\"DOUBLE, NOT NULL, DEFAULT 3.14\","
72 "\"field_name6\":\"STRING, NOT NULL, DEFAULT '3.1415'\","
73 "\"field_name7\":\"LONG, DEFAULT 100\","
74 "\"field_name8\":\"LONG, DEFAULT 100\","
75 "\"field_name9\":\"LONG, DEFAULT 100\","
76 "\"field_name10\":\"LONG, DEFAULT 100\""
77 "},"
78 "\"SCHEMA_INDEXES\":[\"$.field_name1\", \"$.field_name2\"]}";
79
80 const std::string SCHEMA_VALUE1 =
81 "{\"field_name1\":true,"
82 "\"field_name2\":false,"
83 "\"field_name3\":10,"
84 "\"field_name4\":20,"
85 "\"field_name5\":3.14,"
86 "\"field_name6\":\"3.1415\","
87 "\"field_name7\":100,"
88 "\"field_name8\":100,"
89 "\"field_name9\":100,"
90 "\"field_name10\":100}";
91
92 const std::string SCHEMA_VALUE2 =
93 "{\"field_name1\":false,"
94 "\"field_name2\":true,"
95 "\"field_name3\":100,"
96 "\"field_name4\":200,"
97 "\"field_name5\":3.14,"
98 "\"field_name6\":\"3.1415\","
99 "\"field_name7\":100,"
100 "\"field_name8\":100,"
101 "\"field_name9\":100,"
102 "\"field_name10\":100}";
103 }
104
105 class DistributedDBSingleVerP2PQuerySyncTest : public testing::Test {
106 public:
107 static void SetUpTestCase(void);
108 static void TearDownTestCase(void);
109 void SetUp();
110 void TearDown();
111 };
112
SetUpTestCase(void)113 void DistributedDBSingleVerP2PQuerySyncTest::SetUpTestCase(void)
114 {
115 /**
116 * @tc.setup: Init datadir and Virtual Communicator.
117 */
118 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
119 string dir = g_testDir + "/single_ver";
120 DIR* dirTmp = opendir(dir.c_str());
121 if (dirTmp == nullptr) {
122 OS::MakeDBDirectory(dir);
123 } else {
124 closedir(dirTmp);
125 }
126
127 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
128 ASSERT_TRUE(g_communicatorAggregator != nullptr);
129 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
130 }
131
TearDownTestCase(void)132 void DistributedDBSingleVerP2PQuerySyncTest::TearDownTestCase(void)
133 {
134 /**
135 * @tc.teardown: Release virtual Communicator and clear data dir.
136 */
137 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
138 LOGE("rm test db files error!");
139 }
140 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
141 }
142
SetUp(void)143 void DistributedDBSingleVerP2PQuerySyncTest::SetUp(void)
144 {
145 DistributedDBToolsUnitTest::PrintTestCaseInfo();
146 /**
147 * @tc.setup: create virtual device B and get a KvStoreNbDelegate as deviceA
148 */
149 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
150 ASSERT_TRUE(g_deviceB != nullptr);
151 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
152 ASSERT_TRUE(syncInterfaceB != nullptr);
153 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
154 }
155
TearDown(void)156 void DistributedDBSingleVerP2PQuerySyncTest::TearDown(void)
157 {
158 /**
159 * @tc.teardown: Release device A, B
160 */
161 if (g_kvDelegatePtr != nullptr) {
162 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
163 g_kvDelegatePtr = nullptr;
164 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
165 LOGD("delete kv store status %d", status);
166 ASSERT_TRUE(status == OK);
167 }
168 if (g_schemaKvDelegatePtr != nullptr) {
169 ASSERT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
170 g_schemaKvDelegatePtr = nullptr;
171 DBStatus status = g_schemaMgr.DeleteKvStore(SCHEMA_STORE_ID);
172 LOGD("delete kv store status %d", status);
173 ASSERT_TRUE(status == OK);
174 }
175 if (g_deviceB != nullptr) {
176 delete g_deviceB;
177 g_deviceB = nullptr;
178 }
179 PermissionCheckCallbackV2 nullCallback;
180 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
181 }
182
InitNormalDb()183 void InitNormalDb()
184 {
185 g_config.dataDir = g_testDir;
186 g_mgr.SetKvStoreConfig(g_config);
187 KvStoreNbDelegate::Option option;
188 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
189 ASSERT_TRUE(g_kvDelegateStatus == OK);
190 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
191 }
192
InitSchemaDb()193 void InitSchemaDb()
194 {
195 g_config.dataDir = g_testDir;
196 g_schemaMgr.SetKvStoreConfig(g_config);
197 KvStoreNbDelegate::Option option;
198 option.schema = SCHEMA_STRING;
199 g_schemaMgr.GetKvStore(SCHEMA_STORE_ID, option, g_schemaKvDelegateCallback);
200 ASSERT_TRUE(g_schemaKvDelegateStatus == OK);
201 ASSERT_TRUE(g_schemaKvDelegatePtr != nullptr);
202 g_deviceB->SetSchema(option.schema);
203 }
204
205 /**
206 * @tc.name: Normal Sync 001
207 * @tc.desc: Test normal push sync for keyprefix data.
208 * @tc.type: FUNC
209 * @tc.require:
210 * @tc.author: xushaohua
211 */
212 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync001, TestSize.Level1)
213 {
214 InitNormalDb();
215 DBStatus status = OK;
216 std::vector<std::string> devices;
217 devices.push_back(g_deviceB->GetDeviceId());
218
219 /**
220 * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
221 */
222 Key key = {'1'};
223 Value value = {'1'};
224 const int dataSize = 10;
225 for (int i = 0; i < dataSize; i++) {
226 key.push_back(i);
227 value.push_back(i);
228 status = g_kvDelegatePtr->Put(key, value);
229 ASSERT_TRUE(status == OK);
230 key.pop_back();
231 value.pop_back();
232 }
233 Key key2 = {'2'};
234 Value value2 = {'2'};
235 status = g_kvDelegatePtr->Put(key2, value2);
236 ASSERT_TRUE(status == OK);
237
238 /**
239 * @tc.steps: step2. deviceA call query sync and wait
240 * @tc.expected: step2. sync should return OK.
241 */
242 Query query = Query::Select().PrefixKey(key);
243 std::map<std::string, DBStatus> result;
244 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
245 ASSERT_TRUE(status == OK);
246
247 /**
248 * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
249 */
250 ASSERT_TRUE(result.size() == devices.size());
251 for (const auto &pair : result) {
252 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
253 EXPECT_TRUE(pair.second == OK);
254 }
255 VirtualDataItem item;
256 for (int i = 0; i < dataSize; i++) {
257 key.push_back(i);
258 value.push_back(i);
259 g_deviceB->GetData(key, item);
260 EXPECT_TRUE(item.value == value);
261 key.pop_back();
262 value.pop_back();
263 }
264 EXPECT_TRUE(g_deviceB->GetData(key2, item) != E_OK);
265 }
266
267 /**
268 * @tc.name: Normal Sync 002
269 * @tc.desc: Test normal push sync for limit and offset.
270 * @tc.type: FUNC
271 * @tc.require:
272 * @tc.author: xushaohua
273 */
274 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync002, TestSize.Level1)
275 {
276 InitNormalDb();
277 DBStatus status = OK;
278 std::vector<std::string> devices;
279 devices.push_back(g_deviceB->GetDeviceId());
280
281 /**
282 * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
283 */
284 Key key = {'1'};
285 Value value = {'1'};
286 const int dataSize = 10;
287 for (int i = 0; i < dataSize; i++) {
288 key.push_back(i);
289 value.push_back(i);
290 status = g_kvDelegatePtr->Put(key, value);
291 ASSERT_TRUE(status == OK);
292 key.pop_back();
293 value.pop_back();
294 }
295
296 /**
297 * @tc.steps: step2. deviceA call sync and wait
298 * @tc.expected: step2. sync should return OK.
299 */
300 const int limit = 5;
301 const int offset = 4;
302 Query query = Query::Select().PrefixKey(key).Limit(limit, offset);
303 std::map<std::string, DBStatus> result;
304 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
305 ASSERT_TRUE(status == OK);
306
307 /**
308 * @tc.expected: step3. onComplete should be called, DeviceB have {k4,v4} {k8, v8}
309 */
310 ASSERT_TRUE(result.size() == devices.size());
311 for (const auto &pair : result) {
312 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
313 EXPECT_TRUE(pair.second == OK);
314 }
315
316 VirtualDataItem item;
317 for (int i = limit - 1; i < limit + offset; i++) {
318 key.push_back(i);
319 value.push_back(i);
320 g_deviceB->GetData(key, item);
321 EXPECT_TRUE(item.value == value);
322 key.pop_back();
323 value.pop_back();
324 }
325 }
326
327 /**
328 * @tc.name: Normal Sync 001
329 * @tc.desc: Test normal push_and_pull sync for keyprefix data.
330 * @tc.type: FUNC
331 * @tc.require:
332 * @tc.author: zhuwentao
333 */
334 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync003, TestSize.Level1)
335 {
336 InitNormalDb();
337 DBStatus status = OK;
338 std::vector<std::string> devices;
339 devices.push_back(g_deviceB->GetDeviceId());
340
341 /**
342 * @tc.steps: step1. deviceA put {k, v}, {b, v}
343 */
344 Key key = {'1'};
345 Value value = {'1'};
346 const int dataSize = 10;
347 status = g_kvDelegatePtr->Put(key, value);
348 ASSERT_TRUE(status == OK);
349 Key key2 = {'2'};
350 Value value2 = {'2'};
351 status = g_kvDelegatePtr->Put(key2, value2);
352 ASSERT_TRUE(status == OK);
353
354 /**
355 * @tc.steps: step2. deviceB put {b0, v0} - {b9, v9}, {c, v}
356 */
357 for (int i = 0; i < dataSize; i++) {
358 key2.push_back(i);
359 value2.push_back(i);
360 g_deviceB->PutData(key2, value2, 10 + i, 0);
361 key2.pop_back();
362 value2.pop_back();
363 }
364 Key key3 = {'3'};
365 Value value3 = {'3'};
366 g_deviceB->PutData(key3, value3, 20, 0);
367
368 /**
369 * @tc.steps: step2. deviceA call query sync and wait
370 * @tc.expected: step2. sync should return OK.
371 */
372 Query query = Query::Select().PrefixKey(key2);
373 std::map<std::string, DBStatus> result;
374 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
375 ASSERT_TRUE(status == OK);
376
377 /**
378 * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}, DeviceB have {b, v}
379 */
380 ASSERT_TRUE(result.size() == devices.size());
381 for (const auto &pair : result) {
382 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
383 EXPECT_TRUE(pair.second == OK);
384 }
385 VirtualDataItem item;
386 Value tmpValue;
387 for (int i = 0; i < dataSize; i++) {
388 key2.push_back(i);
389 value2.push_back(i);
390 g_kvDelegatePtr->Get(key2, tmpValue);
391 EXPECT_TRUE(tmpValue == value2);
392 key2.pop_back();
393 value2.pop_back();
394 }
395 EXPECT_TRUE(g_deviceB->GetData(key, item) != E_OK);
396 EXPECT_TRUE(g_deviceB->GetData(key2, item) == E_OK);
397 g_kvDelegatePtr->Get(key3, tmpValue);
398 EXPECT_TRUE(tmpValue != value3);
399 }
400
401 /**
402 * @tc.name: Normal Sync 001
403 * @tc.desc: Test normal pull sync for keyprefix data.
404 * @tc.type: FUNC
405 * @tc.require:
406 * @tc.author: zhuwentao
407 */
408 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync004, TestSize.Level1)
409 {
410 InitNormalDb();
411 DBStatus status = OK;
412 std::vector<std::string> devices;
413 devices.push_back(g_deviceB->GetDeviceId());
414 /**
415 * @tc.steps: step1. deviceB put {k1, v1} - {k9, k9}, {b0, v0} - {b9, v9}
416 */
417 Key key = {'1'};
418 Value value = {'1'};
419 const int dataSize = 10;
420 Key key2 = {'2'};
421 Value value2 = {'2'};
422 vector<std::pair<Key, Value>> key1Vec;
423 vector<std::pair<Key, Value>> key2Vec;
424 for (int i = 0; i < dataSize; i++) {
425 Key tmpKey(key);
426 Value tmpValue(value);
427 tmpKey.push_back(i);
428 tmpValue.push_back(i);
429 key1Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
430 }
431 for (int i = 0; i < dataSize; i++) {
432 Key tmpKey(key2);
433 Value tmpValue(value2);
434 tmpKey.push_back(i);
435 tmpValue.push_back(i);
436 key2Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
437 }
438 for (int i = 0; i < dataSize; i++) {
439 g_deviceB->PutData(key2Vec[i].first, key2Vec[i].second, 20 + i, 0);
440 g_deviceB->PutData(key1Vec[i].first, key1Vec[i].second, 10 + i, 0);
441 }
442
443 /**
444 * @tc.steps: step2. deviceA call query sync and wait
445 * @tc.expected: step2. sync should return OK.
446 */
447 Query query = Query::Select().PrefixKey(key2);
448 std::map<std::string, DBStatus> result;
449 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
450 ASSERT_TRUE(status == OK);
451
452 /**
453 * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}
454 */
455 ASSERT_TRUE(result.size() == devices.size());
456 for (const auto &pair : result) {
457 EXPECT_TRUE(pair.second == OK);
458 }
459 VirtualDataItem item;
460 Value tmpValue;
461 for (int i = 0; i < dataSize; i++) {
462 g_kvDelegatePtr->Get(key2Vec[i].first, tmpValue);
463 EXPECT_TRUE(tmpValue == key2Vec[i].second);
464 g_kvDelegatePtr->Get(key1Vec[i].first, tmpValue);
465 EXPECT_TRUE(tmpValue != key1Vec[i].second);
466 }
467 }
468
469 /**
470 * @tc.name: NormalSync005
471 * @tc.desc: Test normal push sync for inkeys query.
472 * @tc.type: FUNC
473 * @tc.require:
474 * @tc.author: lidongwei
475 */
476 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync005, TestSize.Level1)
477 {
478 InitNormalDb();
479 std::vector<std::string> devices;
480 devices.push_back(g_deviceB->GetDeviceId());
481
482 /**
483 * @tc.steps: step1. deviceA put K1-K5
484 */
485 ASSERT_EQ(g_kvDelegatePtr->PutBatch(
486 {{KEY_1, VALUE_1}, {KEY_2, VALUE_2}, {KEY_3, VALUE_3}, {KEY_4, VALUE_4}, {KEY_5, VALUE_5}}), OK);
487
488 /**
489 * @tc.steps: step2. deviceA sync K2,K4 and wait
490 * @tc.expected: step2. sync should return OK.
491 */
492 Query query = Query::Select().InKeys({KEY_2, KEY_4});
493 std::map<std::string, DBStatus> result;
494 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OK);
495
496 /**
497 * @tc.expected: step3. onComplete should be called.
498 */
499 ASSERT_EQ(result.size(), devices.size());
500 for (const auto &pair : result) {
501 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
502 EXPECT_EQ(pair.second, OK);
503 }
504
505 /**
506 * @tc.steps: step4. deviceB have K2K4 and have no K1K3K5.
507 * @tc.expected: step4. sync should return OK.
508 */
509 VirtualDataItem item;
510 EXPECT_EQ(g_deviceB->GetData(KEY_2, item), E_OK);
511 EXPECT_EQ(item.value, VALUE_2);
512 EXPECT_EQ(g_deviceB->GetData(KEY_4, item), E_OK);
513 EXPECT_EQ(item.value, VALUE_4);
514 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
515 EXPECT_EQ(g_deviceB->GetData(KEY_3, item), -E_NOT_FOUND);
516 EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
517
518 /**
519 * @tc.steps: step5. deviceA sync with invalid inkeys query
520 * @tc.expected: step5. sync failed and the rc is right.
521 */
522 query = Query::Select().InKeys({});
523 result.clear();
524 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
525
526 std::set<Key> keys;
527 for (uint8_t i = 0; i < DBConstant::MAX_BATCH_SIZE + 1; i++) {
528 Key key = { i };
529 keys.emplace(key);
530 }
531 query = Query::Select().InKeys(keys);
532 result.clear();
533 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OVER_MAX_LIMITS);
534
535 query = Query::Select().InKeys({{}});
536 result.clear();
537 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
538 }
539
540 /**
541 * @tc.name: NormalSync006
542 * @tc.desc: Test normal push sync with query by 32 devices;
543 * @tc.type: FUNC
544 * @tc.require:
545 * @tc.author: zhuwentao
546 */
547 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync006, TestSize.Level1)
548 {
549 /**
550 * @tc.steps: step1. init db and 32 devices
551 */
552 InitNormalDb();
553 uint32_t syncDevCount = 32u;
554 std::vector<KvVirtualDevice *> virtualDeviceVec(syncDevCount, nullptr);
555 const std::string device = "deviceTmp_";
556 std::vector<std::string> devices;
557 bool isError = false;
558 for (uint32_t i = 0; i < syncDevCount; i++) {
559 std::string tmpDev = device + std::to_string(i);
560 virtualDeviceVec[i] = new (std::nothrow) KvVirtualDevice(tmpDev);
561 if (virtualDeviceVec[i] == nullptr) {
562 isError = true;
563 break;
564 }
565 VirtualSingleVerSyncDBInterface *tmpSyncInterface = new (std::nothrow) VirtualSingleVerSyncDBInterface();
566 if (tmpSyncInterface == nullptr) {
567 isError = true;
568 break;
569 }
570 ASSERT_EQ(virtualDeviceVec[i]->Initialize(g_communicatorAggregator, tmpSyncInterface), E_OK);
571 devices.push_back(virtualDeviceVec[i]->GetDeviceId());
572 }
573 if (isError) {
574 for (uint32_t i = 0; i < syncDevCount; i++) {
575 if (virtualDeviceVec[i] != nullptr) {
576 delete virtualDeviceVec[i];
577 virtualDeviceVec[i] = nullptr;
578 }
579 }
580 ASSERT_TRUE(false);
581 }
582 /**
583 * @tc.steps: step2. deviceA put {k0, v0}
584 */
585 Key key = {'1'};
586 Value value = {'1'};
587 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
588 /**
589 * @tc.steps: step3. deviceA call query sync and wait
590 * @tc.expected: step3. sync should return OK.
591 */
592 Query query = Query::Select().PrefixKey(key);
593 std::map<std::string, DBStatus> result;
594 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
595
596 /**
597 * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
598 */
599 ASSERT_TRUE(result.size() == devices.size());
600 for (const auto &pair : result) {
601 EXPECT_TRUE(pair.second == OK);
602 }
603 VirtualDataItem item;
604 for (uint32_t i = 0; i < syncDevCount; i++) {
605 EXPECT_TRUE(virtualDeviceVec[i]->GetData(key, item) == E_OK);
606 EXPECT_EQ(item.value, value);
607 delete virtualDeviceVec[i];
608 virtualDeviceVec[i] = nullptr;
609 }
610 }
611
612 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryRequestPacketTest001, TestSize.Level1)
613 {
614 /**
615 * @tc.steps: step1. prepare a QuerySyncRequestPacket.
616 */
617 auto packet = new (std::nothrow) DataRequestPacket;
618 ASSERT_TRUE(packet != nullptr);
619 auto kvEntry = new (std::nothrow) GenericSingleVerKvEntry;
620 ASSERT_TRUE(kvEntry != nullptr);
621 kvEntry->SetTimestamp(1);
622 SyncEntry syncData {.entries = {kvEntry}};
623 #ifndef OMIT_ZLIB
624 ASSERT_TRUE(GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
625 {CompressAlgorithm::ZLIB, SOFTWARE_VERSION_CURRENT}) == E_OK);
626 packet->SetCompressAlgo(CompressAlgorithm::ZLIB);
627 packet->SetFlag(4); // set IS_COMPRESS_DATA flag true
628 #endif
629 packet->SetBasicInfo(-E_NOT_SUPPORT, SOFTWARE_VERSION_CURRENT, SyncModeType::QUERY_PUSH_PULL);
630 packet->SetData(syncData.entries);
631 packet->SetCompressData(syncData.compressedEntries);
632 packet->SetEndWaterMark(INT8_MAX);
633 packet->SetWaterMark(INT16_MAX, INT32_MAX, INT64_MAX);
634 QuerySyncObject syncQuery(Query::Select().PrefixKey({'2'}));
635 packet->SetQuery(syncQuery);
636 packet->SetQueryId(syncQuery.GetIdentify());
637 packet->SetReserved(std::vector<uint64_t> {INT8_MAX});
638
639 /**
640 * @tc.steps: step2. put the QuerySyncRequestPacket into a message.
641 */
642 Message msg;
643 msg.SetExternalObject(packet);
644 msg.SetMessageId(QUERY_SYNC_MESSAGE);
645 msg.SetMessageType(TYPE_REQUEST);
646
647 /**
648 * @tc.steps: step3. Serialization the message to a buffer.
649 */
650 int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
651 vector<uint8_t> buffer(len);
652 ASSERT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), E_OK);
653
654 /**
655 * @tc.steps: step4. DeSerialization the buffer to a message.
656 */
657 Message outMsg(QUERY_SYNC_MESSAGE);
658 outMsg.SetMessageType(TYPE_REQUEST);
659 ASSERT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &outMsg), E_OK);
660
661 /**
662 * @tc.steps: step5. checkout the outMsg.
663 * @tc.expected: step5. outMsg equal the the in msg
664 */
665 auto outPacket = outMsg.GetObject<DataRequestPacket>();
666 EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
667 EXPECT_EQ(outPacket->GetMode(), SyncModeType::QUERY_PUSH_PULL);
668 EXPECT_EQ(outPacket->GetEndWaterMark(), static_cast<uint64_t>(INT8_MAX));
669 EXPECT_EQ(outPacket->GetLocalWaterMark(), static_cast<uint64_t>(INT16_MAX));
670 EXPECT_EQ(outPacket->GetPeerWaterMark(), static_cast<uint64_t>(INT32_MAX));
671 EXPECT_EQ(outPacket->GetDeletedWaterMark(), static_cast<uint64_t>(INT64_MAX));
672 #ifndef OMIT_ZLIB
673 EXPECT_EQ(outPacket->GetFlag(), static_cast<uint32_t>(4)); // check IS_COMPRESS_DATA flag true
674 #endif
675 EXPECT_EQ(outPacket->GetQueryId(), syncQuery.GetIdentify());
676 EXPECT_EQ(outPacket->GetReserved(), std::vector<uint64_t> {INT8_MAX});
677 EXPECT_EQ(outPacket->GetSendCode(), -E_NOT_SUPPORT);
678 EXPECT_EQ(outPacket->GetData()[0]->GetTimestamp(), 1u);
679 }
680
681 /**
682 * @tc.name: QueryRequestPacketTest002
683 * @tc.desc: Test exception branch of serialization.
684 * @tc.type: FUNC
685 * @tc.require:
686 * @tc.author: zhangshijie
687 */
688 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, SerializationManager001, TestSize.Level1)
689 {
690 /**
691 * @tc.steps: step1. call SingleVerSerializeManager::Serialization with buffer = nullptr or msg = nullptr
692 * @tc.expected:step1 return -E_MESSAGE_ID_ERROR
693 */
694 Message msg;
695 msg.SetMessageType(TYPE_INVALID);
696 vector<uint8_t> buffer(10); // 10 is test buffer len
697 EXPECT_EQ(SingleVerSerializeManager::Serialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
698 EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
699
700 /**
701 * @tc.steps: step2. call SingleVerSerializeManager::Serialization with invalid type message
702 * @tc.expected:step2 return -E_MESSAGE_ID_ERROR
703 */
704 EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
705
706 /**
707 * @tc.steps: step3. call SingleVerSerializeManager::DeSerialization with buffer = nullptr or msg = nullptr
708 * @tc.expected:step3 return -E_MESSAGE_ID_ERROR
709 */
710 EXPECT_EQ(SingleVerSerializeManager::DeSerialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
711 EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
712
713 /**
714 * @tc.steps: step4. call SingleVerSerializeManager::DeSerialization with invalid type message
715 * @tc.expected:step4 return -E_MESSAGE_ID_ERROR
716 */
717 EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
718 }
719
720 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryAckPacketTest001, TestSize.Level1)
721 {
722 /**
723 * @tc.steps: step1. prepare a QuerySyncAckPacket.
724 */
725 DataAckPacket packet;
726 packet.SetVersion(SOFTWARE_VERSION_CURRENT);
727 packet.SetData(INT64_MAX);
728 packet.SetRecvCode(-E_NOT_SUPPORT);
729 std::vector<uint64_t> reserved = {INT8_MAX};
730 packet.SetReserved(reserved);
731
732 /**
733 * @tc.steps: step2. put the QuerySyncAckPacket into a message.
734 */
735 Message msg;
736 msg.SetCopiedObject(packet);
737 msg.SetMessageId(QUERY_SYNC_MESSAGE);
738 msg.SetMessageType(TYPE_RESPONSE);
739
740 /**
741 * @tc.steps: step3. Serialization the message to a buffer.
742 */
743 int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
744 LOGE("test leng = %d", len);
745 uint8_t *buffer = new (nothrow) uint8_t[len];
746 ASSERT_TRUE(buffer != nullptr);
747 int errCode = SingleVerSerializeManager::Serialization(buffer, len, &msg);
748 ASSERT_EQ(errCode, E_OK);
749
750 /**
751 * @tc.steps: step4. DeSerialization the buffer to a message.
752 */
753 Message outMsg;
754 outMsg.SetMessageId(QUERY_SYNC_MESSAGE);
755 outMsg.SetMessageType(TYPE_RESPONSE);
756 errCode = SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg);
757 ASSERT_EQ(errCode, E_OK);
758
759 /**
760 * @tc.steps: step5. checkout the outMsg.
761 * @tc.expected: step5. outMsg equal the the in msg
762 */
763 auto outPacket = outMsg.GetObject<DataAckPacket>();
764 EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
765 EXPECT_EQ(outPacket->GetData(), static_cast<uint64_t>(INT64_MAX));
766 std::vector<uint64_t> reserved2 = {INT8_MAX};
767 EXPECT_EQ(outPacket->GetReserved(), reserved2);
768 EXPECT_EQ(outPacket->GetRecvCode(), -E_NOT_SUPPORT);
769 delete[] buffer;
770 }
771
772 /**
773 * @tc.name: GetQueryWaterMark 001
774 * @tc.desc: Test metaData save and get queryWaterMark.
775 * @tc.type: FUNC
776 * @tc.require:
777 * @tc.author: zhangqiquan
778 */
779 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark001, TestSize.Level1)
780 {
781 VirtualSingleVerSyncDBInterface storage;
782 Metadata meta;
783
784 /**
785 * @tc.steps: step1. initialize meta with storage
786 * @tc.expected: step1. E_OK
787 */
788 int errCode = meta.Initialize(&storage);
789 ASSERT_EQ(errCode, E_OK);
790
791 /**
792 * @tc.steps: step2. save receive and send watermark
793 * @tc.expected: step2. E_OK
794 */
795 WaterMark w1 = 1;
796 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", "", w1), E_OK);
797 EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", "", w1), E_OK);
798
799 /**
800 * @tc.steps: step3. get receive and send watermark
801 * @tc.expected: step3. E_OK and get the latest value
802 */
803 WaterMark w = 0;
804 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", "", w), E_OK);
805 EXPECT_EQ(w1, w);
806 EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", "", w), E_OK);
807 EXPECT_EQ(w1, w);
808
809 /**
810 * @tc.steps: step4. set peer and local watermark
811 * @tc.expected: step4. E_OK
812 */
813 WaterMark w2 = 2;
814 EXPECT_EQ(meta.SaveLocalWaterMark("D1", "", w2), E_OK);
815 EXPECT_EQ(meta.SavePeerWaterMark("D1", "", w2, true), E_OK);
816
817 /**
818 * @tc.steps: step5. get receive and send watermark
819 * @tc.expected: step5. E_OK and get the w1
820 */
821 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", "", w), E_OK);
822 EXPECT_EQ(w2, w);
823 EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", "", w), E_OK);
824 EXPECT_EQ(w2, w);
825
826 /**
827 * @tc.steps: step6. set peer and local watermark
828 * @tc.expected: step6. E_OK
829 */
830 WaterMark w3 = 3;
831 EXPECT_EQ(meta.SaveLocalWaterMark("D2", "", w3), E_OK);
832 EXPECT_EQ(meta.SavePeerWaterMark("D2", "", w3, true), E_OK);
833
834 /**
835 * @tc.steps: step7. get receive and send watermark
836 * @tc.expected: step7. E_OK and get the w3
837 */
838 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q2", "D2", "", w), E_OK);
839 EXPECT_EQ(w3, w);
840 EXPECT_EQ(meta.GetSendQueryWaterMark("Q2", "D2", "", w), E_OK);
841 EXPECT_EQ(w3, w);
842
843 /**
844 * @tc.steps: step8. get not exit receive and send watermark
845 * @tc.expected: step8. E_OK and get the 0
846 */
847 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q3", "D3", "", w), E_OK);
848 EXPECT_EQ(w, 0u);
849 EXPECT_EQ(meta.GetSendQueryWaterMark("Q3", "D3", "", w), E_OK);
850 EXPECT_EQ(w, 0u);
851 }
852
853 /**
854 * @tc.name: GetQueryWaterMark 002
855 * @tc.desc: Test metaData save and get queryWaterMark after push or pull mode.
856 * @tc.type: FUNC
857 * @tc.require:
858 * @tc.author: zhangqiquan
859 */
860 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark002, TestSize.Level1)
861 {
862 VirtualSingleVerSyncDBInterface storage;
863 Metadata meta;
864
865 /**
866 * @tc.steps: step1. initialize meta with storage
867 * @tc.expected: step1. E_OK
868 */
869 int errCode = meta.Initialize(&storage);
870 ASSERT_EQ(errCode, E_OK);
871
872 /**
873 * @tc.steps: step2. set peer and local watermark
874 * @tc.expected: step2. E_OK
875 */
876 WaterMark w1 = 2;
877 EXPECT_EQ(meta.SaveLocalWaterMark("D1", "", w1), E_OK);
878 EXPECT_EQ(meta.SavePeerWaterMark("D1", "", w1, true), E_OK);
879
880 /**
881 * @tc.steps: step2. save receive and send watermark
882 * @tc.expected: step2. E_OK
883 */
884 WaterMark w2 = 1;
885 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", "", w2), E_OK);
886 EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", "", w2), E_OK);
887
888 /**
889 * @tc.steps: step3. get receive and send watermark
890 * @tc.expected: step3. E_OK and get the bigger value
891 */
892 WaterMark w = 0;
893 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", "", w), E_OK);
894 EXPECT_EQ(w1, w);
895 EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", "", w), E_OK);
896 EXPECT_EQ(w1, w);
897 }
898
899 /**
900 * @tc.name: GetQueryWaterMark 003
901 * @tc.desc: check time offset after remove water mark
902 * @tc.type: FUNC
903 * @tc.require:
904 * @tc.author: lianhuix
905 */
906 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark003, TestSize.Level1)
907 {
908 VirtualSingleVerSyncDBInterface storage;
909 Metadata meta;
910
911 int errCode = meta.Initialize(&storage);
912 ASSERT_EQ(errCode, E_OK);
913
914 const std::string DEVICE_B = "DEVICE_B";
915 TimeOffset offset = 100; // 100: offset
916 meta.SaveTimeOffset(DEVICE_B, "", offset);
917
918 WaterMark w1 = 2; // 2: watermark
919 meta.SavePeerWaterMark(DBCommon::TransferHashString(DEVICE_B), "", w1, false);
920
921 TimeOffset offsetGot;
922 meta.GetTimeOffset(DEVICE_B, "", offsetGot);
923 EXPECT_EQ(offsetGot, offset);
924 }
925
926 /**
927 * @tc.name: GetDeleteWaterMark001
928 * @tc.desc: Test metaData save and get deleteWaterMark.
929 * @tc.type: FUNC
930 * @tc.require:
931 * @tc.author: zhangqiquan
932 */
933 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteWaterMark001, TestSize.Level1)
934 {
935 VirtualSingleVerSyncDBInterface storage;
936 Metadata meta;
937
938 /**
939 * @tc.steps: step1. initialize meta with storage
940 * @tc.expected: step1. E_OK
941 */
942 int errCode = meta.Initialize(&storage);
943 ASSERT_EQ(errCode, E_OK);
944
945 /**
946 * @tc.steps: step2. set and get recv/send delete watermark
947 * @tc.expected: step2. set E_OK and get water mark is equal with last set
948 */
949 const std::string device = "DEVICE";
950 const WaterMark maxWaterMark = 1000u;
__anonfb641fa10202() 951 std::thread recvThread([&meta, &device, &maxWaterMark]() {
952 for (WaterMark expectRecv = 0u; expectRecv < maxWaterMark; ++expectRecv) {
953 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark(device, "", expectRecv), E_OK);
954 WaterMark actualRecv = 0u;
955 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark(device, "", actualRecv), E_OK);
956 EXPECT_EQ(actualRecv, expectRecv);
957 }
958 });
__anonfb641fa10302() 959 std::thread sendThread([&meta, &device, &maxWaterMark]() {
960 for (WaterMark expectSend = 0u; expectSend < maxWaterMark; ++expectSend) {
961 EXPECT_EQ(meta.SetSendDeleteSyncWaterMark(device, "", expectSend), E_OK);
962 WaterMark actualSend = 0u;
963 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark(device, "", actualSend), E_OK);
964 EXPECT_EQ(actualSend, expectSend);
965 }
966 });
967 recvThread.join();
968 sendThread.join();
969 }
970
971 /**
972 * @tc.name: ClearQueryWaterMark 001
973 * @tc.desc: Test metaData clear watermark function.
974 * @tc.type: FUNC
975 * @tc.require:
976 * @tc.author: zhangqiquan
977 */
978 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark001, TestSize.Level1)
979 {
980 VirtualSingleVerSyncDBInterface storage;
981 Metadata meta;
982
983 /**
984 * @tc.steps: step1. initialize meta with storage
985 * @tc.expected: step1. E_OK
986 */
987 int errCode = meta.Initialize(&storage);
988 ASSERT_EQ(errCode, E_OK);
989
990 /**
991 * @tc.steps: step2. save receive watermark
992 * @tc.expected: step2. E_OK
993 */
994 WaterMark w1 = 1;
995 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", "", w1), E_OK);
996
997 /**
998 * @tc.steps: step3. erase peer watermark
999 * @tc.expected: step3. E_OK
1000 */
1001 EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
1002
1003 /**
1004 * @tc.steps: step4. get receive watermark
1005 * @tc.expected: step4. E_OK receive watermark is zero
1006 */
1007 WaterMark w2 = -1;
1008 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", "", w2), E_OK);
1009 EXPECT_EQ(w2, 0u);
1010
1011 /**
1012 * @tc.steps: step5. set peer watermark
1013 * @tc.expected: step5. E_OK
1014 */
1015 WaterMark w3 = 2;
1016 EXPECT_EQ(meta.SavePeerWaterMark("D1", "", w3, true), E_OK);
1017
1018 /**
1019 * @tc.steps: step6. get receive watermark
1020 * @tc.expected: step6. E_OK receive watermark is peer watermark
1021 */
1022 WaterMark w4 = -1;
1023 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", "", w4), E_OK);
1024 EXPECT_EQ(w4, w3);
1025 }
1026
1027 /**
1028 * @tc.name: ClearQueryWaterMark 002
1029 * @tc.desc: Test metaData clear watermark function.
1030 * @tc.type: FUNC
1031 * @tc.require:
1032 * @tc.author: zhangqiquan
1033 */
1034 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark002, TestSize.Level1)
1035 {
1036 VirtualSingleVerSyncDBInterface storage;
1037 Metadata meta;
1038
1039 /**
1040 * @tc.steps: step1. initialize meta with storage
1041 * @tc.expected: step1. E_OK
1042 */
1043 int errCode = meta.Initialize(&storage);
1044 ASSERT_EQ(errCode, E_OK);
1045
1046 /**
1047 * @tc.steps: step2. save receive watermark
1048 * @tc.expected: step2. E_OK
1049 */
1050 WaterMark w1 = 1;
1051 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", "", w1), E_OK);
1052 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q2", "D1", "", w1), E_OK);
1053 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D2", "", w1), E_OK);
1054
1055 /**
1056 * @tc.steps: step3. erase peer watermark, make sure data remove in db
1057 * @tc.expected: step3. E_OK
1058 */
1059 Metadata anotherMeta;
1060 ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1061 EXPECT_EQ(anotherMeta.EraseDeviceWaterMark("D1", true), E_OK);
1062
1063 /**
1064 * @tc.steps: step4. get receive watermark
1065 * @tc.expected: step4. E_OK receive watermark is zero
1066 */
1067 WaterMark w2 = -1;
1068 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D1", "", w2), E_OK);
1069 EXPECT_EQ(w2, 0u);
1070 w2 = -1;
1071 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q2", "D1", "", w2), E_OK);
1072 EXPECT_EQ(w2, 0u);
1073 w2 = -1;
1074 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D2", "", w2), E_OK);
1075 EXPECT_EQ(w2, w1);
1076 }
1077
1078 /**
1079 * @tc.name: ClearQueryWaterMark 003
1080 * @tc.desc: Test metaData clear watermark busy.
1081 * @tc.type: FUNC
1082 * @tc.require:
1083 * @tc.author: zhangqiquan
1084 */
1085 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark003, TestSize.Level1)
1086 {
1087 VirtualSingleVerSyncDBInterface storage;
1088 Metadata meta;
1089 /**
1090 * @tc.steps: step1. initialize meta with storage
1091 * @tc.expected: step1. E_OK
1092 */
1093 int errCode = meta.Initialize(&storage);
1094 ASSERT_EQ(errCode, E_OK);
1095 /**
1096 * @tc.steps: step2. set busy and erase water mark
1097 * @tc.expected: step2. -E_BUSY
1098 */
1099 storage.SetBusy(false, true);
1100 EXPECT_EQ(meta.EraseDeviceWaterMark("DEVICE_ID", true), -E_BUSY);
1101 }
1102
1103 /**
1104 * @tc.name: GetQueryLastTimestamp001
1105 * @tc.desc: Test function of GetQueryLastTimestamp.
1106 * @tc.type: FUNC
1107 * @tc.require:
1108 * @tc.author: zhangshijie
1109 */
1110 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryLastTimestamp001, TestSize.Level1)
1111 {
1112 /**
1113 * @tc.steps: step1. initialize meta with nullptr
1114 * @tc.expected: step1. return -E_INVALID_DB
1115 */
1116 Metadata meta;
1117 EXPECT_EQ(meta.Initialize(nullptr), -E_INVALID_DB);
1118
1119 /**
1120 * @tc.steps: step2. initialize meta with storage
1121 * @tc.expected: step2. E_OK
1122 */
1123 VirtualSingleVerSyncDBInterface storage;
1124 int errCode = meta.Initialize(&storage);
1125 ASSERT_EQ(errCode, E_OK);
1126
1127 /**
1128 * @tc.steps: step3. call GetQueryLastTimestamp with a non-exists device
1129 * @tc.expected: step3. return INT64_MAX
1130 */
1131 EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), static_cast<uint64_t>(INT64_MAX));
1132
1133 /**
1134 * @tc.steps: step4. call GetQueryLastTimestamp with device D1 again
1135 * @tc.expected: step4. return 0
1136 */
1137 EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), 0u);
1138
1139 /**
1140 * @tc.steps: step5. call GetQueryLastTimestamp with device D1 and Q2
1141 * @tc.expected: step5. return INT64_MAX
1142 */
1143 EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q2"), static_cast<uint64_t>(INT64_MAX));
1144 }
1145
1146 /**
1147 * @tc.name: GetQueryLastTimestamp002
1148 * @tc.desc: Test Metadata::GetQueryLastTimestamp when timestamp out of INT64 range.
1149 * @tc.type: FUNC
1150 * @tc.author: liuhongyang
1151 */
1152 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryLastTimestamp002, TestSize.Level1)
1153 {
1154 /**
1155 * @tc.steps: step1. initialize storage with fake meta data
1156 * @tc.expected: step1. E_OK
1157 */
1158 const int64_t validInt64Num1 = 3000000000000000000; // a random valid int64 number
1159 const int64_t validInt64Num2 = 3000000000000000001; // a random valid int64 number
1160 // key is queryId, value is: [stored timestamp in db, expect return value of GetQueryLastTimestamp]
1161 std::map<std::string, std::pair<std::string, int64_t>> idValueMap = {
1162 {"regular1", {"3000000000000000000", validInt64Num1}},
1163 {"max", {"9223372036854775807", INT64_MAX}},
1164 {"min", {"-9223372036854775808", INT64_MIN}},
1165 {"overMax", {"9223372036854775808", INT64_MAX}},
1166 {"underMin", {"-9223372036854775809", INT64_MIN}},
1167 {"regular2", {"3000000000000000001", validInt64Num2}}};
1168 Key metaKey;
1169 Value value;
1170 VirtualSingleVerSyncDBInterface storage;
1171 for (auto &pair : idValueMap) {
1172 std::string keyStr = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(pair.first);
1173 DBCommon::StringToVector(keyStr, metaKey);
1174 DBCommon::StringToVector(pair.second.first, value);
1175 EXPECT_EQ(storage.PutMetaData(metaKey, value, false), E_OK);
1176 }
1177 /**
1178 * @tc.steps: step2. call GetQueryLastTimestamp with different query id
1179 * @tc.expected: step2. get the correct return value
1180 */
1181 Metadata meta;
1182 int errCode = meta.Initialize(&storage);
1183 ASSERT_EQ(errCode, E_OK);
1184 for (auto &pair : idValueMap) {
1185 auto &queryId = pair.first;
1186 auto &expectVal = pair.second.second;
1187 EXPECT_EQ(meta.GetQueryLastTimestamp("any", queryId), static_cast<uint64_t>(expectVal));
1188 }
1189 }
1190
1191 /**
1192 * @tc.name: MetaDataExceptionBranch001
1193 * @tc.desc: Test execption branch of meata data.
1194 * @tc.type: FUNC
1195 * @tc.require:
1196 * @tc.author: zhangshijie
1197 */
1198 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, MetaDataExceptionBranch001, TestSize.Level1)
1199 {
1200 /**
1201 * @tc.steps: step1. call GetRemoveDataMark with a device not in map
1202 * @tc.expected: step1. out value = 0
1203 */
1204 Metadata meta;
1205 VirtualSingleVerSyncDBInterface storage;
1206 int errCode = meta.Initialize(&storage);
1207 ASSERT_EQ(errCode, E_OK);
1208
1209 uint64_t val = 99; // 99 is the initial value of outValue
1210 uint64_t outValue = val;
1211 meta.GetRemoveDataMark("D1", "", outValue);
1212 EXPECT_EQ(outValue, 0u);
1213
1214 /**
1215 * @tc.steps: step2. reset outValue, call GetDbCreateTime with a device not in map
1216 * @tc.expected: step2. out value = 0
1217 */
1218 outValue = val;
1219 meta.GetDbCreateTime("D1", "", outValue);
1220 EXPECT_EQ(outValue, 0u);
1221
1222 /**
1223 * @tc.steps: step3. call ResetMetaDataAfterRemoveData with a device not in map
1224 * @tc.expected: step3. return -E_NOT_FOUND
1225 */
1226 EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1", ""), -E_NOT_FOUND);
1227 }
1228
1229 /**
1230 * @tc.name: GetDeleteKeyWaterMark 001
1231 * @tc.desc: Test metaData save and get deleteWaterMark.
1232 * @tc.type: FUNC
1233 * @tc.require:
1234 * @tc.author: zhangqiquan
1235 */
1236 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark001, TestSize.Level1)
1237 {
1238 VirtualSingleVerSyncDBInterface storage;
1239 Metadata meta;
1240
1241 /**
1242 * @tc.steps: step1. initialize meta with storage
1243 * @tc.expected: step1. E_OK
1244 */
1245 int errCode = meta.Initialize(&storage);
1246 ASSERT_EQ(errCode, E_OK);
1247
1248 /**
1249 * @tc.steps: step2. save receive and send watermark
1250 * @tc.expected: step2. E_OK
1251 */
1252 WaterMark w1 = 1;
1253 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", "", w1), E_OK);
1254 EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", "", w1), E_OK);
1255
1256 /**
1257 * @tc.steps: step3. get receive and send watermark
1258 * @tc.expected: step3. E_OK and get the latest value
1259 */
1260 WaterMark w = 0;
1261 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", "", w), E_OK);
1262 EXPECT_EQ(w1, w);
1263 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", "", w), E_OK);
1264 EXPECT_EQ(w1, w);
1265
1266 /**
1267 * @tc.steps: step4. set peer and local watermark
1268 * @tc.expected: step4. E_OK
1269 */
1270 WaterMark w2 = 2;
1271 EXPECT_EQ(meta.SaveLocalWaterMark("D1", "", w2), E_OK);
1272 EXPECT_EQ(meta.SavePeerWaterMark("D1", "", w2, true), E_OK);
1273
1274 /**
1275 * @tc.steps: step5. get receive and send watermark
1276 * @tc.expected: step5. E_OK and get the w2
1277 */
1278 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", "", w), E_OK);
1279 EXPECT_EQ(w2, w);
1280 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", "", w), E_OK);
1281 EXPECT_EQ(w2, w);
1282
1283 /**
1284 * @tc.steps: step6. set peer and local watermark
1285 * @tc.expected: step6. E_OK
1286 */
1287 WaterMark w3 = 3;
1288 EXPECT_EQ(meta.SaveLocalWaterMark("D2", "", w3), E_OK);
1289 EXPECT_EQ(meta.SavePeerWaterMark("D2", "", w3, true), E_OK);
1290
1291 /**
1292 * @tc.steps: step7. get receive and send watermark
1293 * @tc.expected: step7. E_OK and get the w3
1294 */
1295 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D2", "", w), E_OK);
1296 EXPECT_EQ(w3, w);
1297 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D2", "", w), E_OK);
1298 EXPECT_EQ(w3, w);
1299
1300 /**
1301 * @tc.steps: step8. get not exit receive and send watermark
1302 * @tc.expected: step8. E_OK and get the 0
1303 */
1304 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D3", "", w), E_OK);
1305 EXPECT_EQ(w, 0u);
1306 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D3", "", w), E_OK);
1307 EXPECT_EQ(w, 0u);
1308 }
1309
1310 /**
1311 * @tc.name: GetDeleteKeyWaterMark 002
1312 * @tc.desc: Test metaData save and get deleteWaterMark after push or pull mode.
1313 * @tc.type: FUNC
1314 * @tc.require:
1315 * @tc.author: zhangqiquan
1316 */
1317 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark002, TestSize.Level1)
1318 {
1319 VirtualSingleVerSyncDBInterface storage;
1320 Metadata meta;
1321
1322 /**
1323 * @tc.steps: step1. initialize meta with storage
1324 * @tc.expected: step1. E_OK
1325 */
1326 int errCode = meta.Initialize(&storage);
1327 ASSERT_EQ(errCode, E_OK);
1328
1329 /**
1330 * @tc.steps: step2. set peer and local watermark
1331 * @tc.expected: step2. E_OK
1332 */
1333 WaterMark w1 = 3;
1334 EXPECT_EQ(meta.SaveLocalWaterMark("D1", "", w1), E_OK);
1335 EXPECT_EQ(meta.SavePeerWaterMark("D1", "", w1, true), E_OK);
1336
1337 /**
1338 * @tc.steps: step2. save receive and send watermark
1339 * @tc.expected: step2. E_OK
1340 */
1341 WaterMark w2 = 1;
1342 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", "", w2), E_OK);
1343 EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", "", w2), E_OK);
1344
1345 /**
1346 * @tc.steps: step3. get receive and send watermark
1347 * @tc.expected: step3. E_OK and get the bigger value
1348 */
1349 WaterMark w = 0;
1350 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", "", w), E_OK);
1351 EXPECT_EQ(w1, w);
1352 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", "", w), E_OK);
1353 EXPECT_EQ(w1, w);
1354 }
1355
1356 /**
1357 * @tc.name: ClearDeleteKeyWaterMark 001
1358 * @tc.desc: Test metaData clear watermark function.
1359 * @tc.type: FUNC
1360 * @tc.require:
1361 * @tc.author: zhangqiquan
1362 */
1363 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearDeleteKeyWaterMark001, TestSize.Level1)
1364 {
1365 VirtualSingleVerSyncDBInterface storage;
1366 Metadata meta;
1367
1368 /**
1369 * @tc.steps: step1. initialize meta with storage
1370 * @tc.expected: step1. E_OK
1371 */
1372 int errCode = meta.Initialize(&storage);
1373 ASSERT_EQ(errCode, E_OK);
1374
1375 /**
1376 * @tc.steps: step2. save receive watermark
1377 * @tc.expected: step2. E_OK
1378 */
1379 WaterMark w1 = 1;
1380 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", "", w1), E_OK);
1381
1382 /**
1383 * @tc.steps: step3. erase peer watermark
1384 * @tc.expected: step3. E_OK
1385 */
1386 EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
1387
1388 /**
1389 * @tc.steps: step4. get receive watermark
1390 * @tc.expected: step4. E_OK receive watermark is zero
1391 */
1392 WaterMark w2 = -1;
1393 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", "", w2), E_OK);
1394 EXPECT_EQ(w2, 0u);
1395
1396 /**
1397 * @tc.steps: step5. set peer watermark
1398 * @tc.expected: step5. E_OK
1399 */
1400 WaterMark w3 = 2;
1401 EXPECT_EQ(meta.SavePeerWaterMark("D1", "", w3, true), E_OK);
1402
1403 /**
1404 * @tc.steps: step6. get receive watermark
1405 * @tc.expected: step6. E_OK receive watermark is peer watermark
1406 */
1407 WaterMark w4 = -1;
1408 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", "", w4), E_OK);
1409 EXPECT_EQ(w4, w3);
1410 }
1411
1412 /**
1413 * @tc.name: VerifyCacheAndDb 001
1414 * @tc.desc: Test metaData watermark cache and db are consistent and correct.
1415 * @tc.type: FUNC
1416 * @tc.require:
1417 * @tc.author: zhangqiquan
1418 */
1419 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataQuerySync001, TestSize.Level1)
1420 {
1421 Metadata meta;
1422 VirtualSingleVerSyncDBInterface storage;
1423
1424 /**
1425 * @tc.steps: step1. initialize meta with storage
1426 * @tc.expected: step1. E_OK
1427 */
1428 int errCode = meta.Initialize(&storage);
1429 ASSERT_EQ(errCode, E_OK);
1430
1431 const std::string deviceId = "D1";
1432 const std::string queryId = "Q1";
1433
1434 /**
1435 * @tc.steps: step2. save deleteSync watermark
1436 * @tc.expected: step2. E_OK
1437 */
1438 WaterMark deleteWaterMark = 1;
1439 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark(deviceId, "", deleteWaterMark), E_OK);
1440 EXPECT_EQ(meta.SetSendDeleteSyncWaterMark(deviceId, "", deleteWaterMark), E_OK);
1441
1442 /**
1443 * @tc.steps: step3. save querySync watermark
1444 * @tc.expected: step2. E_OK
1445 */
1446 WaterMark queryWaterMark = 2;
1447 EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, "", queryWaterMark), E_OK);
1448 EXPECT_EQ(meta.SetSendQueryWaterMark(queryId, deviceId, "", queryWaterMark), E_OK);
1449
1450 /**
1451 * @tc.steps: step4. initialize meta with storage
1452 * @tc.expected: step4. E_OK
1453 */
1454 Metadata anotherMeta;
1455 ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1456
1457 /**
1458 * @tc.steps: step5. verify delete sync data
1459 * @tc.expected: step5. E_OK and waterMark equal to deleteWaterMark
1460 */
1461 WaterMark waterMark;
1462 EXPECT_EQ(anotherMeta.GetRecvDeleteSyncWaterMark(deviceId, "", waterMark), E_OK);
1463 EXPECT_EQ(waterMark, deleteWaterMark);
1464 EXPECT_EQ(anotherMeta.GetSendDeleteSyncWaterMark(deviceId, "", waterMark), E_OK);
1465 EXPECT_EQ(waterMark, deleteWaterMark);
1466
1467 /**
1468 * @tc.steps: step6. verify query sync data
1469 * @tc.expected: step6. E_OK and waterMark equal to queryWaterMark
1470 */
1471 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark(queryId, deviceId, "", waterMark), E_OK);
1472 EXPECT_EQ(waterMark, queryWaterMark);
1473 EXPECT_EQ(anotherMeta.GetSendQueryWaterMark(queryId, deviceId, "", waterMark), E_OK);
1474 EXPECT_EQ(waterMark, queryWaterMark);
1475 }
1476
1477 /**
1478 * @tc.name: VerifyLruMap 001
1479 * @tc.desc: Test metaData watermark cache lru ability.
1480 * @tc.type: FUNC
1481 * @tc.require:
1482 * @tc.author: zhangqiquan
1483 */
1484 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyLruMap001, TestSize.Level1)
1485 {
1486 LruMap<std::string, QueryWaterMark> lruMap;
1487 const int maxCacheItems = 200;
1488
1489 /**
1490 * @tc.steps: step1. fill items to LruMap
1491 * @tc.expected: step1. E_OK
1492 */
1493 const int startCount = 0;
1494 for (int i = startCount; i < maxCacheItems; i++) {
1495 std::string key = std::to_string(i);
1496 QueryWaterMark value;
1497 value.recvWaterMark = static_cast<uint64_t>(i + 1);
1498 EXPECT_EQ(lruMap.Put(key, value), E_OK);
1499 }
1500
1501 /**
1502 * @tc.steps: step2. get the first item
1503 * @tc.expected: step2. E_OK first item will move to last
1504 */
1505 std::string firstItemKey = std::to_string(startCount);
1506 QueryWaterMark firstItemValue;
1507 EXPECT_EQ(lruMap.Get(firstItemKey, firstItemValue), E_OK);
1508 EXPECT_EQ(firstItemValue.recvWaterMark, 1u);
1509
1510 /**
1511 * @tc.steps: step3. insert new items to LruMap
1512 * @tc.expected: step3. the second items was removed
1513 */
1514 std::string key = std::to_string(maxCacheItems);
1515 QueryWaterMark value;
1516 value.recvWaterMark = maxCacheItems;
1517 EXPECT_EQ(lruMap.Put(key, value), E_OK);
1518
1519 /**
1520 * @tc.steps: step4. get the second item
1521 * @tc.expected: step4. E_NOT_FOUND it was removed by algorithm
1522 */
1523 std::string secondItemKey = std::to_string(startCount + 1);
1524 QueryWaterMark secondItemValue;
1525 EXPECT_EQ(lruMap.Get(secondItemKey, secondItemValue), -E_NOT_FOUND);
1526 EXPECT_EQ(secondItemValue.recvWaterMark, 0u);
1527 }
1528
1529 /**
1530 * @tc.name: VerifyMetaDataInit 001
1531 * @tc.desc: Test metaData init correctly
1532 * @tc.type: FUNC
1533 * @tc.require:
1534 * @tc.author: zhangqiquan
1535 */
1536 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataInit001, TestSize.Level1)
1537 {
1538 Metadata meta;
1539 VirtualSingleVerSyncDBInterface storage;
1540
1541 /**
1542 * @tc.steps: step1. initialize meta with storage
1543 * @tc.expected: step1. E_OK
1544 */
1545 ASSERT_EQ(meta.Initialize(&storage), E_OK);
1546
1547 DeviceID deviceA = "DeviceA";
1548 DeviceID deviceB = "DeviceA";
1549 WaterMark setWaterMark = 1;
1550
1551 /**
1552 * @tc.steps: step2. meta save and get waterMark
1553 * @tc.expected: step2. expect get the same waterMark
1554 */
1555 EXPECT_EQ(meta.SaveLocalWaterMark(deviceA, "", setWaterMark), E_OK);
1556 EXPECT_EQ(meta.SaveLocalWaterMark(deviceB, "", setWaterMark), E_OK);
1557 WaterMark getWaterMark = 0;
1558 meta.GetLocalWaterMark(deviceA, "", getWaterMark);
1559 EXPECT_EQ(getWaterMark, setWaterMark);
1560 meta.GetLocalWaterMark(deviceB, "", getWaterMark);
1561 EXPECT_EQ(getWaterMark, setWaterMark);
1562
1563
1564 /**
1565 * @tc.steps: step3. init again
1566 * @tc.expected: step3. E_OK
1567 */
1568 Metadata anotherMeta;
1569 ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1570
1571 /**
1572 * @tc.steps: step4. get waterMark again
1573 * @tc.expected: step4. expect get the same waterMark
1574 */
1575 anotherMeta.GetLocalWaterMark(deviceA, "", getWaterMark);
1576 EXPECT_EQ(getWaterMark, setWaterMark);
1577 anotherMeta.GetLocalWaterMark(deviceB, "", getWaterMark);
1578 EXPECT_EQ(getWaterMark, setWaterMark);
1579 }
1580
1581 namespace {
InitVerifyStorageEnvironment(Metadata & meta,VirtualSingleVerSyncDBInterface & storage,const std::string & deviceId,const int & startCount,const uint32_t & maxStoreItems)1582 void InitVerifyStorageEnvironment(Metadata &meta, VirtualSingleVerSyncDBInterface &storage,
1583 const std::string &deviceId, const int &startCount, const uint32_t &maxStoreItems)
1584 {
1585 /**
1586 * @tc.steps: step1. initialize meta with storage
1587 * @tc.expected: step1. E_OK
1588 */
1589 ASSERT_EQ(meta.Initialize(&storage), E_OK);
1590
1591 /**
1592 * @tc.steps: step2. fill items to metadata
1593 * @tc.expected: step2. E_OK
1594 */
1595 for (uint32_t i = startCount; i < maxStoreItems; i++) {
1596 std::string queryId = std::to_string(i);
1597 WaterMark recvWaterMark = i + 1;
1598 EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, "", recvWaterMark), E_OK);
1599 }
1600 }
1601 }
1602
1603 /**
1604 * @tc.name: VerifyManagerQuerySyncStorage 001
1605 * @tc.desc: Test metaData remove least used querySync storage items.
1606 * @tc.type: FUNC
1607 * @tc.require:
1608 * @tc.author: zhangqiquan
1609 */
1610 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage001, TestSize.Level3)
1611 {
1612 Metadata meta;
1613 VirtualSingleVerSyncDBInterface storage;
1614 const uint32_t maxStoreItems = 100000;
1615 const int startCount = 0;
1616 const std::string deviceId = "Device";
1617
1618 InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1619
1620 /**
1621 * @tc.steps: step3. insert new items to metadata
1622 * @tc.expected: step3. E_OK
1623 */
1624 std::string newQueryId = std::to_string(maxStoreItems);
1625 WaterMark newWaterMark = maxStoreItems + 1;
1626 EXPECT_EQ(meta.SetRecvQueryWaterMark(newQueryId, deviceId, "", newWaterMark), E_OK);
1627
1628 /**
1629 * @tc.steps: step4. touch the first item
1630 * @tc.expected: step4. E_OK update first item used time
1631 */
1632 std::string firstItemKey = std::to_string(startCount);
1633 WaterMark firstWaterMark = 11u;
1634 EXPECT_EQ(meta.SetRecvQueryWaterMark(firstItemKey, deviceId, "", firstWaterMark), E_OK);
1635
1636 /**
1637 * @tc.steps: step5. initialize new meta with storage
1638 * @tc.expected: step5. the second item will be removed
1639 */
1640 Metadata newMeta;
1641 ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1642
1643 /**
1644 * @tc.steps: step6. touch the first item
1645 * @tc.expected: step6. E_OK it still exist
1646 */
1647 WaterMark exceptWaterMark;
1648 EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, "", exceptWaterMark), E_OK);
1649 EXPECT_EQ(exceptWaterMark, firstWaterMark);
1650
1651 /**
1652 * @tc.steps: step7. get the second item
1653 * @tc.expected: step7. NOT_FOUND secondWaterMark is zero
1654 */
1655 WaterMark secondWaterMark;
1656 std::string secondQueryId = std::to_string(startCount + 1);
1657 EXPECT_EQ(newMeta.GetRecvQueryWaterMark(secondQueryId, deviceId, "", secondWaterMark), E_OK);
1658 EXPECT_EQ(secondWaterMark, 0u);
1659 }
1660
1661 /**
1662 * @tc.name: VerifyMetaDbCreateTime 001
1663 * @tc.desc: Test metaData get and set cbCreateTime.
1664 * @tc.type: FUNC
1665 * @tc.require:
1666 * @tc.author: zhuwentao
1667 */
1668 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDbCreateTime001, TestSize.Level1)
1669 {
1670 Metadata meta;
1671 VirtualSingleVerSyncDBInterface storage;
1672 /**
1673 * @tc.steps: step1. initialize meta with storage
1674 * @tc.expected: step1. E_OK
1675 */
1676 int errCode = meta.Initialize(&storage);
1677 ASSERT_EQ(errCode, E_OK);
1678 /**
1679 * @tc.steps: step2. set local and peer watermark and dbCreateTime
1680 * @tc.expected: step4. E_OK
1681 */
1682 WaterMark value = 2;
1683 EXPECT_EQ(meta.SaveLocalWaterMark("D1", "", value), E_OK);
1684 EXPECT_EQ(meta.SavePeerWaterMark("D1", "", value, true), E_OK);
1685 EXPECT_EQ(meta.SetDbCreateTime("D1", "", 10u, true), E_OK);
1686 /**
1687 * @tc.steps: step3. check peer and local watermark and dbCreateTime
1688 * @tc.expected: step4. E_OK
1689 */
1690 WaterMark curValue = 0;
1691 meta.GetLocalWaterMark("D1", "", curValue);
1692 EXPECT_EQ(value, curValue);
1693 meta.GetPeerWaterMark("D1", "", curValue);
1694 EXPECT_EQ(value, curValue);
1695 uint64_t curDbCreatTime = 0;
1696 meta.GetDbCreateTime("D1", "", curDbCreatTime);
1697 EXPECT_EQ(curDbCreatTime, 10u);
1698 /**
1699 * @tc.steps: step3. change dbCreateTime and check
1700 * @tc.expected: step4. E_OK
1701 */
1702 EXPECT_EQ(meta.SetDbCreateTime("D1", "", 20u, true), E_OK);
1703 uint64_t clearDeviceDataMark = INT_MAX;
1704 meta.GetRemoveDataMark("D1", "", clearDeviceDataMark);
1705 EXPECT_EQ(clearDeviceDataMark, 1u);
1706 EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1", ""), E_OK);
1707 meta.GetRemoveDataMark("D1", "", clearDeviceDataMark);
1708 EXPECT_EQ(clearDeviceDataMark, 0u);
1709 meta.GetDbCreateTime("D1", "", curDbCreatTime);
1710 EXPECT_EQ(curDbCreatTime, 20u);
1711 }
1712
1713 /**
1714 * @tc.name: VerifyManagerQuerySyncStorage 002
1715 * @tc.desc: Test metaData remove least used querySync storage items when exit wrong data.
1716 * @tc.type: FUNC
1717 * @tc.require:
1718 * @tc.author: zhangqiquan
1719 */
1720 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage002, TestSize.Level3)
1721 {
1722 Metadata meta;
1723 VirtualSingleVerSyncDBInterface storage;
1724 const uint32_t maxStoreItems = 100000;
1725 const int startCount = 0;
1726 const std::string deviceId = "Device";
1727
1728 InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1729
1730 /**
1731 * @tc.steps: step3. insert a wrong Value
1732 * @tc.expected: step3. E_OK
1733 */
1734 std::string newQueryId = std::to_string(maxStoreItems);
1735 Key dbKey;
1736 DBCommon::StringToVector(QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
1737 + DBCommon::TransferHashString(deviceId) + newQueryId, dbKey);
1738 Value wrongValue;
1739 EXPECT_EQ(storage.PutMetaData(dbKey, wrongValue, false), E_OK);
1740
1741 /**
1742 * @tc.steps: step4. initialize new meta with storage
1743 * @tc.expected: step4. E_OK
1744 */
1745 Metadata newMeta;
1746 ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1747
1748 /**
1749 * @tc.steps: step5. touch the first item
1750 * @tc.expected: step5. E_OK still exit
1751 */
1752 std::string firstItemKey = std::to_string(startCount);
1753 WaterMark exceptWaterMark;
1754 EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, "", exceptWaterMark), E_OK);
1755 EXPECT_EQ(exceptWaterMark, 1u);
1756 }
1757
1758 /**
1759 * @tc.name: AllPredicateQuerySync001
1760 * @tc.desc: Test normal push sync for AllPredicate data.
1761 * @tc.type: FUNC
1762 * @tc.require:
1763 * @tc.author: zhuwentao
1764 */
1765 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync001, TestSize.Level1)
1766 {
1767 /**
1768 * @tc.steps: step1. InitSchemaDb
1769 */
1770 InitSchemaDb();
1771 DBStatus status = OK;
1772 std::vector<std::string> devices;
1773 devices.push_back(g_deviceB->GetDeviceId());
1774
1775 /**
1776 * @tc.steps: step2. deviceA put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1777 {key21, SCHEMA_VALUE2} - {key29, SCHEMA_VALUE2}
1778 */
1779 Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1780 Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1781 Key key = {'1'};
1782 Key key2 = {'2'};
1783 const int dataSize = 4000;
1784 for (int i = 0; i < dataSize; i++) {
1785 key.push_back(i);
1786 key2.push_back(i);
1787 status = g_schemaKvDelegatePtr->Put(key, value);
1788 ASSERT_TRUE(status == OK);
1789 status = g_schemaKvDelegatePtr->Put(key2, value2);
1790 ASSERT_TRUE(status == OK);
1791 key.pop_back();
1792 key2.pop_back();
1793 }
1794 ASSERT_TRUE(status == OK);
1795
1796 /**
1797 * @tc.steps: step3. deviceA call query sync and wait
1798 * @tc.expected: step3. sync should return OK.
1799 */
1800 Query query = Query::Select().EqualTo("$.field_name1", 1);
1801 std::map<std::string, DBStatus> result;
1802 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1803 ASSERT_TRUE(status == OK);
1804
1805 /**
1806 * @tc.expected: step4. onComplete should be called, DeviceB have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1807 */
1808 ASSERT_TRUE(result.size() == devices.size());
1809 for (const auto &pair : result) {
1810 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1811 EXPECT_TRUE(pair.second == OK);
1812 }
1813 VirtualDataItem item;
1814 VirtualDataItem item2;
1815 for (int i = 0; i < dataSize; i++) {
1816 key.push_back(i);
1817 key2.push_back(i);
1818 g_deviceB->GetData(key, item);
1819 EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1820 EXPECT_TRUE(item.value == value);
1821 key.pop_back();
1822 key2.pop_back();
1823 }
1824 }
1825
1826 /**
1827 * @tc.name: AllPredicateQuerySync002
1828 * @tc.desc: Test wrong query param push sync for AllPredicate data.
1829 * @tc.type: FUNC
1830 * @tc.require:
1831 * @tc.author: zhuwentao
1832 */
1833 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync002, TestSize.Level1)
1834 {
1835 /**
1836 * @tc.steps: step1. InitSchemaDb
1837 */
1838 InitSchemaDb();
1839 DBStatus status = OK;
1840 std::vector<std::string> devices;
1841 devices.push_back(g_deviceB->GetDeviceId());
1842
1843 /**
1844 * @tc.steps: step2. deviceA call query sync and wait
1845 * @tc.expected: step2. sync should return INVALID_QUERY_FIELD
1846 */
1847 Query query = Query::Select().GreaterThan("field_name11", 10);
1848 std::map<std::string, DBStatus> result;
1849 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1850 ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1851 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
1852 ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1853 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1854 ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1855 }
1856
1857 /**
1858 * @tc.name: AllPredicateQuerySync003
1859 * @tc.desc: Test normal push sync for AllPredicate data with limit
1860 * @tc.type: FUNC
1861 * @tc.require:
1862 * @tc.author: zhuwentao
1863 */
1864 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync003, TestSize.Level1)
1865 {
1866 /**
1867 * @tc.steps: step1. InitSchemaDb
1868 */
1869 InitSchemaDb();
1870 DBStatus status = OK;
1871 std::vector<std::string> devices;
1872 devices.push_back(g_deviceB->GetDeviceId());
1873
1874 /**
1875 * @tc.steps: step2. deviceA put {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1876 */
1877 Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1878 Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1879 Key key = {'1'};
1880 Key key2 = {'2'};
1881 const int dataSize = 10;
1882 for (int i = 0; i < dataSize; i++) {
1883 key.push_back(i);
1884 key2.push_back(i);
1885 status = g_schemaKvDelegatePtr->Put(key, value);
1886 ASSERT_TRUE(status == OK);
1887 status = g_schemaKvDelegatePtr->Put(key2, value2);
1888 ASSERT_TRUE(status == OK);
1889 key.pop_back();
1890 key2.pop_back();
1891 }
1892 ASSERT_TRUE(status == OK);
1893
1894 /**
1895 * @tc.steps: step3. deviceA call query sync with limit and wait
1896 * @tc.expected: step3. sync should return OK.
1897 */
1898 Query query = Query::Select().EqualTo("$.field_name1", 1).Limit(20, 0);
1899 std::map<std::string, DBStatus> result;
1900 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1901 ASSERT_TRUE(status == OK);
1902
1903 /**
1904 * @tc.expected: step4. onComplete should be called, DeviceB have {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1905 */
1906 ASSERT_TRUE(result.size() == devices.size());
1907 for (const auto &pair : result) {
1908 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1909 EXPECT_TRUE(pair.second == OK);
1910 }
1911 VirtualDataItem item;
1912 VirtualDataItem item2;
1913 for (int i = 0; i < dataSize; i++) {
1914 key.push_back(i);
1915 key2.push_back(i);
1916 g_deviceB->GetData(key, item);
1917 EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1918 EXPECT_TRUE(item.value == value);
1919 key.pop_back();
1920 key2.pop_back();
1921 }
1922 }
1923
1924 /**
1925 * @tc.name: AllPredicateQuerySync004
1926 * @tc.desc: Test normal pull sync for AllPredicate data.
1927 * @tc.type: FUNC
1928 * @tc.require:
1929 * @tc.author: zhuwentao
1930 */
1931 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync004, TestSize.Level1)
1932 {
1933 /**
1934 * @tc.steps: step1. InitSchemaDb
1935 */
1936 InitSchemaDb();
1937 std::vector<std::string> devices;
1938 devices.push_back(g_deviceB->GetDeviceId());
1939
1940 /**
1941 * @tc.steps: step2. deviceB put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1942 */
1943 Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1944 Key key = {'1'};
1945 const int dataSize = 10;
1946 for (int i = 0; i < dataSize; i++) {
1947 key.push_back(i);
1948 g_deviceB->PutData(key, value, 10 + i, 0);
1949 key.pop_back();
1950 }
1951
1952 /**
1953 * @tc.steps: step3. deviceA call query sync and wait
1954 * @tc.expected: step3. sync should return OK.
1955 */
1956 Query query = Query::Select().EqualTo("$.field_name1", 1);
1957 std::map<std::string, DBStatus> result;
1958 DBStatus status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1959 ASSERT_TRUE(status == OK);
1960
1961 /**
1962 * @tc.expected: step4. onComplete should be called, DeviceA have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1963 */
1964 ASSERT_TRUE(result.size() == devices.size());
1965 for (const auto &pair : result) {
1966 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1967 EXPECT_TRUE(pair.second == OK);
1968 }
1969 Value item;
1970 Value item2;
1971 for (int i = 0; i < dataSize; i++) {
1972 key.push_back(i);
1973 g_schemaKvDelegatePtr->Get(key, item);
1974 EXPECT_TRUE(item == value);
1975 key.pop_back();
1976 }
1977 }