1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <thread>
18
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "generic_single_ver_kv_entry.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "platform_specific.h"
27 #include "query.h"
28 #include "query_sync_object.h"
29 #include "single_ver_data_sync.h"
30 #include "single_ver_serialize_manager.h"
31 #include "sync_types.h"
32 #include "virtual_communicator.h"
33 #include "virtual_communicator_aggregator.h"
34 #include "virtual_single_ver_sync_db_Interface.h"
35
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40
41 namespace {
42 string g_testDir;
43 const string STORE_ID = "kv_store_sync_test";
44 const string SCHEMA_STORE_ID = "kv_store_sync_schema_test";
45 const std::string DEVICE_B = "deviceB";
46
47 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
48 KvStoreDelegateManager g_schemaMgr(SCHEMA_APP_ID, USER_ID);
49 KvStoreConfig g_config;
50 DistributedDBToolsUnitTest g_tool;
51 DBStatus g_kvDelegateStatus = INVALID_ARGS;
52 DBStatus g_schemaKvDelegateStatus = INVALID_ARGS;
53 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
54 KvStoreNbDelegate* g_schemaKvDelegatePtr = nullptr;
55 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
56 KvVirtualDevice *g_deviceB = nullptr;
57
58 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
59 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
60 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
61 auto g_schemaKvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
62 placeholders::_1, placeholders::_2, std::ref(g_schemaKvDelegateStatus), std::ref(g_schemaKvDelegatePtr));
63 const string SCHEMA_STRING =
64 "{\"SCHEMA_VERSION\":\"1.0\","
65 "\"SCHEMA_MODE\":\"STRICT\","
66 "\"SCHEMA_DEFINE\":{"
67 "\"field_name1\":\"BOOL\","
68 "\"field_name2\":\"BOOL\","
69 "\"field_name3\":\"INTEGER, NOT NULL\","
70 "\"field_name4\":\"LONG, DEFAULT 100\","
71 "\"field_name5\":\"DOUBLE, NOT NULL, DEFAULT 3.14\","
72 "\"field_name6\":\"STRING, NOT NULL, DEFAULT '3.1415'\","
73 "\"field_name7\":\"LONG, DEFAULT 100\","
74 "\"field_name8\":\"LONG, DEFAULT 100\","
75 "\"field_name9\":\"LONG, DEFAULT 100\","
76 "\"field_name10\":\"LONG, DEFAULT 100\""
77 "},"
78 "\"SCHEMA_INDEXES\":[\"$.field_name1\", \"$.field_name2\"]}";
79
80 const std::string SCHEMA_VALUE1 =
81 "{\"field_name1\":true,"
82 "\"field_name2\":false,"
83 "\"field_name3\":10,"
84 "\"field_name4\":20,"
85 "\"field_name5\":3.14,"
86 "\"field_name6\":\"3.1415\","
87 "\"field_name7\":100,"
88 "\"field_name8\":100,"
89 "\"field_name9\":100,"
90 "\"field_name10\":100}";
91
92 const std::string SCHEMA_VALUE2 =
93 "{\"field_name1\":false,"
94 "\"field_name2\":true,"
95 "\"field_name3\":100,"
96 "\"field_name4\":200,"
97 "\"field_name5\":3.14,"
98 "\"field_name6\":\"3.1415\","
99 "\"field_name7\":100,"
100 "\"field_name8\":100,"
101 "\"field_name9\":100,"
102 "\"field_name10\":100}";
103 }
104
105 class DistributedDBSingleVerP2PQuerySyncTest : public testing::Test {
106 public:
107 static void SetUpTestCase(void);
108 static void TearDownTestCase(void);
109 void SetUp();
110 void TearDown();
111 };
112
SetUpTestCase(void)113 void DistributedDBSingleVerP2PQuerySyncTest::SetUpTestCase(void)
114 {
115 /**
116 * @tc.setup: Init datadir and Virtual Communicator.
117 */
118 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
119 string dir = g_testDir + "/single_ver";
120 DIR* dirTmp = opendir(dir.c_str());
121 if (dirTmp == nullptr) {
122 OS::MakeDBDirectory(dir);
123 } else {
124 closedir(dirTmp);
125 }
126
127 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
128 ASSERT_TRUE(g_communicatorAggregator != nullptr);
129 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
130 }
131
TearDownTestCase(void)132 void DistributedDBSingleVerP2PQuerySyncTest::TearDownTestCase(void)
133 {
134 /**
135 * @tc.teardown: Release virtual Communicator and clear data dir.
136 */
137 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
138 LOGE("rm test db files error!");
139 }
140 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
141 }
142
SetUp(void)143 void DistributedDBSingleVerP2PQuerySyncTest::SetUp(void)
144 {
145 DistributedDBToolsUnitTest::PrintTestCaseInfo();
146 /**
147 * @tc.setup: create virtual device B and get a KvStoreNbDelegate as deviceA
148 */
149 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
150 ASSERT_TRUE(g_deviceB != nullptr);
151 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
152 ASSERT_TRUE(syncInterfaceB != nullptr);
153 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
154 }
155
TearDown(void)156 void DistributedDBSingleVerP2PQuerySyncTest::TearDown(void)
157 {
158 /**
159 * @tc.teardown: Release device A, B
160 */
161 if (g_kvDelegatePtr != nullptr) {
162 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
163 g_kvDelegatePtr = nullptr;
164 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
165 LOGD("delete kv store status %d", status);
166 ASSERT_TRUE(status == OK);
167 }
168 if (g_schemaKvDelegatePtr != nullptr) {
169 ASSERT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
170 g_schemaKvDelegatePtr = nullptr;
171 DBStatus status = g_schemaMgr.DeleteKvStore(SCHEMA_STORE_ID);
172 LOGD("delete kv store status %d", status);
173 ASSERT_TRUE(status == OK);
174 }
175 if (g_deviceB != nullptr) {
176 delete g_deviceB;
177 g_deviceB = nullptr;
178 }
179 PermissionCheckCallbackV2 nullCallback;
180 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
181 }
182
InitNormalDb()183 void InitNormalDb()
184 {
185 g_config.dataDir = g_testDir;
186 g_mgr.SetKvStoreConfig(g_config);
187 KvStoreNbDelegate::Option option;
188 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
189 ASSERT_TRUE(g_kvDelegateStatus == OK);
190 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
191 }
192
InitSchemaDb()193 void InitSchemaDb()
194 {
195 g_config.dataDir = g_testDir;
196 g_schemaMgr.SetKvStoreConfig(g_config);
197 KvStoreNbDelegate::Option option;
198 option.schema = SCHEMA_STRING;
199 g_schemaMgr.GetKvStore(SCHEMA_STORE_ID, option, g_schemaKvDelegateCallback);
200 ASSERT_TRUE(g_schemaKvDelegateStatus == OK);
201 ASSERT_TRUE(g_schemaKvDelegatePtr != nullptr);
202 }
203
204 /**
205 * @tc.name: Normal Sync 001
206 * @tc.desc: Test normal push sync for keyprefix data.
207 * @tc.type: FUNC
208 * @tc.require: AR000FN6G9
209 * @tc.author: xushaohua
210 */
211 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync001, TestSize.Level1)
212 {
213 InitNormalDb();
214 DBStatus status = OK;
215 std::vector<std::string> devices;
216 devices.push_back(g_deviceB->GetDeviceId());
217
218 /**
219 * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
220 */
221 Key key = {'1'};
222 Value value = {'1'};
223 const int dataSize = 10;
224 for (int i = 0; i < dataSize; i++) {
225 key.push_back(i);
226 value.push_back(i);
227 status = g_kvDelegatePtr->Put(key, value);
228 ASSERT_TRUE(status == OK);
229 key.pop_back();
230 value.pop_back();
231 }
232 Key key2 = {'2'};
233 Value value2 = {'2'};
234 status = g_kvDelegatePtr->Put(key2, value2);
235 ASSERT_TRUE(status == OK);
236
237 /**
238 * @tc.steps: step2. deviceA call query sync and wait
239 * @tc.expected: step2. sync should return OK.
240 */
241 Query query = Query::Select().PrefixKey(key);
242 std::map<std::string, DBStatus> result;
243 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
244 ASSERT_TRUE(status == OK);
245
246 /**
247 * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
248 */
249 ASSERT_TRUE(result.size() == devices.size());
250 for (const auto &pair : result) {
251 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
252 EXPECT_TRUE(pair.second == OK);
253 }
254 VirtualDataItem item;
255 for (int i = 0; i < dataSize; i++) {
256 key.push_back(i);
257 value.push_back(i);
258 g_deviceB->GetData(key, item);
259 EXPECT_TRUE(item.value == value);
260 key.pop_back();
261 value.pop_back();
262 }
263 EXPECT_TRUE(g_deviceB->GetData(key2, item) != E_OK);
264 }
265
266 /**
267 * @tc.name: Normal Sync 002
268 * @tc.desc: Test normal push sync for limit and offset.
269 * @tc.type: FUNC
270 * @tc.require: AR000FN6G9
271 * @tc.author: xushaohua
272 */
273 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync002, TestSize.Level1)
274 {
275 InitNormalDb();
276 DBStatus status = OK;
277 std::vector<std::string> devices;
278 devices.push_back(g_deviceB->GetDeviceId());
279
280 /**
281 * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
282 */
283 Key key = {'1'};
284 Value value = {'1'};
285 const int dataSize = 10;
286 for (int i = 0; i < dataSize; i++) {
287 key.push_back(i);
288 value.push_back(i);
289 status = g_kvDelegatePtr->Put(key, value);
290 ASSERT_TRUE(status == OK);
291 key.pop_back();
292 value.pop_back();
293 }
294
295 /**
296 * @tc.steps: step2. deviceA call sync and wait
297 * @tc.expected: step2. sync should return OK.
298 */
299 const int limit = 5;
300 const int offset = 4;
301 Query query = Query::Select().PrefixKey(key).Limit(limit, offset);
302 std::map<std::string, DBStatus> result;
303 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
304 ASSERT_TRUE(status == OK);
305
306 /**
307 * @tc.expected: step3. onComplete should be called, DeviceB have {k4,v4} {k8, v8}
308 */
309 ASSERT_TRUE(result.size() == devices.size());
310 for (const auto &pair : result) {
311 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
312 EXPECT_TRUE(pair.second == OK);
313 }
314
315 VirtualDataItem item;
316 for (int i = limit - 1; i < limit + offset; i++) {
317 key.push_back(i);
318 value.push_back(i);
319 g_deviceB->GetData(key, item);
320 EXPECT_TRUE(item.value == value);
321 key.pop_back();
322 value.pop_back();
323 }
324 }
325
326 /**
327 * @tc.name: Normal Sync 001
328 * @tc.desc: Test normal push_and_pull sync for keyprefix data.
329 * @tc.type: FUNC
330 * @tc.require: AR000FN6G9
331 * @tc.author: zhuwentao
332 */
333 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync003, TestSize.Level1)
334 {
335 InitNormalDb();
336 DBStatus status = OK;
337 std::vector<std::string> devices;
338 devices.push_back(g_deviceB->GetDeviceId());
339
340 /**
341 * @tc.steps: step1. deviceA put {k, v}, {b, v}
342 */
343 Key key = {'1'};
344 Value value = {'1'};
345 const int dataSize = 10;
346 status = g_kvDelegatePtr->Put(key, value);
347 ASSERT_TRUE(status == OK);
348 Key key2 = {'2'};
349 Value value2 = {'2'};
350 status = g_kvDelegatePtr->Put(key2, value2);
351 ASSERT_TRUE(status == OK);
352
353 /**
354 * @tc.steps: step2. deviceB put {b0, v0} - {b9, v9}, {c, v}
355 */
356 for (int i = 0; i < dataSize; i++) {
357 key2.push_back(i);
358 value2.push_back(i);
359 g_deviceB->PutData(key2, value2, 10 + i, 0);
360 key2.pop_back();
361 value2.pop_back();
362 }
363 Key key3 = {'3'};
364 Value value3 = {'3'};
365 g_deviceB->PutData(key3, value3, 20, 0);
366
367 /**
368 * @tc.steps: step2. deviceA call query sync and wait
369 * @tc.expected: step2. sync should return OK.
370 */
371 Query query = Query::Select().PrefixKey(key2);
372 std::map<std::string, DBStatus> result;
373 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
374 ASSERT_TRUE(status == OK);
375
376 /**
377 * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}, DeviceB have {b, v}
378 */
379 ASSERT_TRUE(result.size() == devices.size());
380 for (const auto &pair : result) {
381 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
382 EXPECT_TRUE(pair.second == OK);
383 }
384 VirtualDataItem item;
385 Value tmpValue;
386 for (int i = 0; i < dataSize; i++) {
387 key2.push_back(i);
388 value2.push_back(i);
389 g_kvDelegatePtr->Get(key2, tmpValue);
390 EXPECT_TRUE(tmpValue == value2);
391 key2.pop_back();
392 value2.pop_back();
393 }
394 EXPECT_TRUE(g_deviceB->GetData(key, item) != E_OK);
395 EXPECT_TRUE(g_deviceB->GetData(key2, item) == E_OK);
396 g_kvDelegatePtr->Get(key3, tmpValue);
397 EXPECT_TRUE(tmpValue != value3);
398 }
399
400 /**
401 * @tc.name: Normal Sync 001
402 * @tc.desc: Test normal pull sync for keyprefix data.
403 * @tc.type: FUNC
404 * @tc.require: AR000FN6G9
405 * @tc.author: zhuwentao
406 */
407 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync004, TestSize.Level1)
408 {
409 InitNormalDb();
410 DBStatus status = OK;
411 std::vector<std::string> devices;
412 devices.push_back(g_deviceB->GetDeviceId());
413 /**
414 * @tc.steps: step1. deviceB put {k1, v1} - {k9, k9}, {b0, v0} - {b9, v9}
415 */
416 Key key = {'1'};
417 Value value = {'1'};
418 const int dataSize = 10;
419 Key key2 = {'2'};
420 Value value2 = {'2'};
421 vector<std::pair<Key, Value>> key1Vec;
422 vector<std::pair<Key, Value>> key2Vec;
423 for (int i = 0; i < dataSize; i++) {
424 Key tmpKey(key);
425 Value tmpValue(value);
426 tmpKey.push_back(i);
427 tmpValue.push_back(i);
428 key1Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
429 }
430 for (int i = 0; i < dataSize; i++) {
431 Key tmpKey(key2);
432 Value tmpValue(value2);
433 tmpKey.push_back(i);
434 tmpValue.push_back(i);
435 key2Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
436 }
437 for (int i = 0; i < dataSize; i++) {
438 g_deviceB->PutData(key2Vec[i].first, key2Vec[i].second, 20 + i, 0);
439 g_deviceB->PutData(key1Vec[i].first, key1Vec[i].second, 10 + i, 0);
440 }
441
442 /**
443 * @tc.steps: step2. deviceA call query sync and wait
444 * @tc.expected: step2. sync should return OK.
445 */
446 Query query = Query::Select().PrefixKey(key2);
447 std::map<std::string, DBStatus> result;
448 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
449 ASSERT_TRUE(status == OK);
450
451 /**
452 * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}
453 */
454 ASSERT_TRUE(result.size() == devices.size());
455 for (const auto &pair : result) {
456 EXPECT_TRUE(pair.second == OK);
457 }
458 VirtualDataItem item;
459 Value tmpValue;
460 for (int i = 0; i < dataSize; i++) {
461 g_kvDelegatePtr->Get(key2Vec[i].first, tmpValue);
462 EXPECT_TRUE(tmpValue == key2Vec[i].second);
463 g_kvDelegatePtr->Get(key1Vec[i].first, tmpValue);
464 EXPECT_TRUE(tmpValue != key1Vec[i].second);
465 }
466 }
467
468 /**
469 * @tc.name: NormalSync005
470 * @tc.desc: Test normal push sync for inkeys query.
471 * @tc.type: FUNC
472 * @tc.require: AR000GOHO7
473 * @tc.author: lidongwei
474 */
475 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync005, TestSize.Level1)
476 {
477 InitNormalDb();
478 std::vector<std::string> devices;
479 devices.push_back(g_deviceB->GetDeviceId());
480
481 /**
482 * @tc.steps: step1. deviceA put K1-K5
483 */
484 ASSERT_EQ(g_kvDelegatePtr->PutBatch(
485 {{KEY_1, VALUE_1}, {KEY_2, VALUE_2}, {KEY_3, VALUE_3}, {KEY_4, VALUE_4}, {KEY_5, VALUE_5}}), OK);
486
487 /**
488 * @tc.steps: step2. deviceA sync K2,K4 and wait
489 * @tc.expected: step2. sync should return OK.
490 */
491 Query query = Query::Select().InKeys({KEY_2, KEY_4});
492 std::map<std::string, DBStatus> result;
493 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OK);
494
495 /**
496 * @tc.expected: step3. onComplete should be called.
497 */
498 ASSERT_EQ(result.size(), devices.size());
499 for (const auto &pair : result) {
500 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
501 EXPECT_EQ(pair.second, OK);
502 }
503
504 /**
505 * @tc.steps: step4. deviceB have K2K4 and have no K1K3K5.
506 * @tc.expected: step4. sync should return OK.
507 */
508 VirtualDataItem item;
509 EXPECT_EQ(g_deviceB->GetData(KEY_2, item), E_OK);
510 EXPECT_EQ(item.value, VALUE_2);
511 EXPECT_EQ(g_deviceB->GetData(KEY_4, item), E_OK);
512 EXPECT_EQ(item.value, VALUE_4);
513 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
514 EXPECT_EQ(g_deviceB->GetData(KEY_3, item), -E_NOT_FOUND);
515 EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
516
517 /**
518 * @tc.steps: step5. deviceA sync with invalid inkeys query
519 * @tc.expected: step5. sync failed and the rc is right.
520 */
521 query = Query::Select().InKeys({});
522 result.clear();
523 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
524
525 std::set<Key> keys;
526 for (uint8_t i = 0; i < DBConstant::MAX_BATCH_SIZE + 1; i++) {
527 Key key = { i };
528 keys.emplace(key);
529 }
530 query = Query::Select().InKeys(keys);
531 result.clear();
532 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OVER_MAX_LIMITS);
533
534 query = Query::Select().InKeys({{}});
535 result.clear();
536 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
537 }
538
539 /**
540 * @tc.name: NormalSync006
541 * @tc.desc: Test normal push sync with query by 32 devices;
542 * @tc.type: FUNC
543 * @tc.require:
544 * @tc.author: zhuwentao
545 */
546 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync006, TestSize.Level1)
547 {
548 /**
549 * @tc.steps: step1. init db and 32 devices
550 */
551 InitNormalDb();
552 uint32_t syncDevCount = 32u;
553 std::vector<KvVirtualDevice *> virtualDeviceVec(syncDevCount, nullptr);
554 const std::string device = "deviceTmp_";
555 std::vector<std::string> devices;
556 bool isError = false;
557 for (uint32_t i = 0; i < syncDevCount; i++) {
558 std::string tmpDev = device + std::to_string(i);
559 virtualDeviceVec[i] = new (std::nothrow) KvVirtualDevice(tmpDev);
560 if (virtualDeviceVec[i] == nullptr) {
561 isError = true;
562 break;
563 }
564 VirtualSingleVerSyncDBInterface *tmpSyncInterface = new (std::nothrow) VirtualSingleVerSyncDBInterface();
565 if (tmpSyncInterface == nullptr) {
566 isError = true;
567 break;
568 }
569 ASSERT_EQ(virtualDeviceVec[i]->Initialize(g_communicatorAggregator, tmpSyncInterface), E_OK);
570 devices.push_back(virtualDeviceVec[i]->GetDeviceId());
571 }
572 if (isError) {
573 for (uint32_t i = 0; i < syncDevCount; i++) {
574 if (virtualDeviceVec[i] != nullptr) {
575 delete virtualDeviceVec[i];
576 virtualDeviceVec[i] = nullptr;
577 }
578 }
579 ASSERT_TRUE(false);
580 }
581 /**
582 * @tc.steps: step2. deviceA put {k0, v0}
583 */
584 Key key = {'1'};
585 Value value = {'1'};
586 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
587 /**
588 * @tc.steps: step3. deviceA call query sync and wait
589 * @tc.expected: step3. sync should return OK.
590 */
591 Query query = Query::Select().PrefixKey(key);
592 std::map<std::string, DBStatus> result;
593 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
594
595 /**
596 * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
597 */
598 ASSERT_TRUE(result.size() == devices.size());
599 for (const auto &pair : result) {
600 EXPECT_TRUE(pair.second == OK);
601 }
602 VirtualDataItem item;
603 for (uint32_t i = 0; i < syncDevCount; i++) {
604 EXPECT_TRUE(virtualDeviceVec[i]->GetData(key, item) == E_OK);
605 EXPECT_EQ(item.value, value);
606 delete virtualDeviceVec[i];
607 virtualDeviceVec[i] = nullptr;
608 }
609 }
610
611 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryRequestPacketTest001, TestSize.Level1)
612 {
613 /**
614 * @tc.steps: step1. prepare a QuerySyncRequestPacket.
615 */
616 auto packet = new (std::nothrow) DataRequestPacket;
617 ASSERT_TRUE(packet != nullptr);
618 auto kvEntry = new (std::nothrow) GenericSingleVerKvEntry;
619 ASSERT_TRUE(kvEntry != nullptr);
620 kvEntry->SetTimestamp(1);
621 SyncEntry syncData {.entries = {kvEntry}};
622 #ifndef OMIT_ZLIB
623 ASSERT_TRUE(GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
624 {CompressAlgorithm::ZLIB, SOFTWARE_VERSION_CURRENT}) == E_OK);
625 packet->SetCompressAlgo(CompressAlgorithm::ZLIB);
626 packet->SetFlag(4); // set IS_COMPRESS_DATA flag true
627 #endif
628 packet->SetBasicInfo(-E_NOT_SUPPORT, SOFTWARE_VERSION_CURRENT, SyncModeType::QUERY_PUSH_PULL);
629 packet->SetData(syncData.entries);
630 packet->SetCompressData(syncData.compressedEntries);
631 packet->SetEndWaterMark(INT8_MAX);
632 packet->SetWaterMark(INT16_MAX, INT32_MAX, INT64_MAX);
633 QuerySyncObject syncQuery(Query::Select().PrefixKey({'2'}));
634 packet->SetQuery(syncQuery);
635 packet->SetQueryId(syncQuery.GetIdentify());
636 packet->SetReserved(std::vector<uint64_t> {INT8_MAX});
637
638 /**
639 * @tc.steps: step2. put the QuerySyncRequestPacket into a message.
640 */
641 Message msg;
642 msg.SetExternalObject(packet);
643 msg.SetMessageId(QUERY_SYNC_MESSAGE);
644 msg.SetMessageType(TYPE_REQUEST);
645
646 /**
647 * @tc.steps: step3. Serialization the message to a buffer.
648 */
649 int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
650 vector<uint8_t> buffer(len);
651 ASSERT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), E_OK);
652
653 /**
654 * @tc.steps: step4. DeSerialization the buffer to a message.
655 */
656 Message outMsg(QUERY_SYNC_MESSAGE);
657 outMsg.SetMessageType(TYPE_REQUEST);
658 ASSERT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &outMsg), E_OK);
659
660 /**
661 * @tc.steps: step5. checkout the outMsg.
662 * @tc.expected: step5. outMsg equal the the in msg
663 */
664 auto outPacket = outMsg.GetObject<DataRequestPacket>();
665 EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
666 EXPECT_EQ(outPacket->GetMode(), SyncModeType::QUERY_PUSH_PULL);
667 EXPECT_EQ(outPacket->GetEndWaterMark(), static_cast<uint64_t>(INT8_MAX));
668 EXPECT_EQ(outPacket->GetLocalWaterMark(), static_cast<uint64_t>(INT16_MAX));
669 EXPECT_EQ(outPacket->GetPeerWaterMark(), static_cast<uint64_t>(INT32_MAX));
670 EXPECT_EQ(outPacket->GetDeletedWaterMark(), static_cast<uint64_t>(INT64_MAX));
671 #ifndef OMIT_ZLIB
672 EXPECT_EQ(outPacket->GetFlag(), static_cast<uint32_t>(4)); // check IS_COMPRESS_DATA flag true
673 #endif
674 EXPECT_EQ(outPacket->GetQueryId(), syncQuery.GetIdentify());
675 EXPECT_EQ(outPacket->GetReserved(), std::vector<uint64_t> {INT8_MAX});
676 EXPECT_EQ(outPacket->GetSendCode(), -E_NOT_SUPPORT);
677 EXPECT_EQ(outPacket->GetData()[0]->GetTimestamp(), 1u);
678 }
679
680 /**
681 * @tc.name: QueryRequestPacketTest002
682 * @tc.desc: Test exception branch of serialization.
683 * @tc.type: FUNC
684 * @tc.require:
685 * @tc.author: zhangshijie
686 */
687 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, SerializationManager001, TestSize.Level1)
688 {
689 /**
690 * @tc.steps: step1. call SingleVerSerializeManager::Serialization with buffer = nullptr or msg = nullptr
691 * @tc.expected:step1 return -E_MESSAGE_ID_ERROR
692 */
693 Message msg;
694 msg.SetMessageType(TYPE_INVALID);
695 vector<uint8_t> buffer(10); // 10 is test buffer len
696 EXPECT_EQ(SingleVerSerializeManager::Serialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
697 EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
698
699 /**
700 * @tc.steps: step2. call SingleVerSerializeManager::Serialization with invalid type message
701 * @tc.expected:step2 return -E_MESSAGE_ID_ERROR
702 */
703 EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
704
705 /**
706 * @tc.steps: step3. call SingleVerSerializeManager::DeSerialization with buffer = nullptr or msg = nullptr
707 * @tc.expected:step3 return -E_MESSAGE_ID_ERROR
708 */
709 EXPECT_EQ(SingleVerSerializeManager::DeSerialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
710 EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
711
712 /**
713 * @tc.steps: step4. call SingleVerSerializeManager::DeSerialization with invalid type message
714 * @tc.expected:step4 return -E_MESSAGE_ID_ERROR
715 */
716 EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
717 }
718
719 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryAckPacketTest001, TestSize.Level1)
720 {
721 /**
722 * @tc.steps: step1. prepare a QuerySyncAckPacket.
723 */
724 DataAckPacket packet;
725 packet.SetVersion(SOFTWARE_VERSION_CURRENT);
726 packet.SetData(INT64_MAX);
727 packet.SetRecvCode(-E_NOT_SUPPORT);
728 std::vector<uint64_t> reserved = {INT8_MAX};
729 packet.SetReserved(reserved);
730
731 /**
732 * @tc.steps: step2. put the QuerySyncAckPacket into a message.
733 */
734 Message msg;
735 msg.SetCopiedObject(packet);
736 msg.SetMessageId(QUERY_SYNC_MESSAGE);
737 msg.SetMessageType(TYPE_RESPONSE);
738
739 /**
740 * @tc.steps: step3. Serialization the message to a buffer.
741 */
742 int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
743 LOGE("test leng = %d", len);
744 uint8_t *buffer = new (nothrow) uint8_t[len];
745 ASSERT_TRUE(buffer != nullptr);
746 int errCode = SingleVerSerializeManager::Serialization(buffer, len, &msg);
747 ASSERT_EQ(errCode, E_OK);
748
749 /**
750 * @tc.steps: step4. DeSerialization the buffer to a message.
751 */
752 Message outMsg;
753 outMsg.SetMessageId(QUERY_SYNC_MESSAGE);
754 outMsg.SetMessageType(TYPE_RESPONSE);
755 errCode = SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg);
756 ASSERT_EQ(errCode, E_OK);
757
758 /**
759 * @tc.steps: step5. checkout the outMsg.
760 * @tc.expected: step5. outMsg equal the the in msg
761 */
762 auto outPacket = outMsg.GetObject<DataAckPacket>();
763 EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
764 EXPECT_EQ(outPacket->GetData(), static_cast<uint64_t>(INT64_MAX));
765 std::vector<uint64_t> reserved2 = {INT8_MAX};
766 EXPECT_EQ(outPacket->GetReserved(), reserved2);
767 EXPECT_EQ(outPacket->GetRecvCode(), -E_NOT_SUPPORT);
768 delete[] buffer;
769 }
770
771 /**
772 * @tc.name: GetQueryWaterMark 001
773 * @tc.desc: Test metaData save and get queryWaterMark.
774 * @tc.type: FUNC
775 * @tc.require: AR000FN6G9
776 * @tc.author: zhangqiquan
777 */
778 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark001, TestSize.Level1)
779 {
780 VirtualSingleVerSyncDBInterface storage;
781 Metadata meta;
782
783 /**
784 * @tc.steps: step1. initialize meta with storage
785 * @tc.expected: step1. E_OK
786 */
787 int errCode = meta.Initialize(&storage);
788 ASSERT_EQ(errCode, E_OK);
789
790 /**
791 * @tc.steps: step2. save receive and send watermark
792 * @tc.expected: step2. E_OK
793 */
794 WaterMark w1 = 1;
795 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
796 EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", w1), E_OK);
797
798 /**
799 * @tc.steps: step3. get receive and send watermark
800 * @tc.expected: step3. E_OK and get the latest value
801 */
802 WaterMark w = 0;
803 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
804 EXPECT_EQ(w1, w);
805 EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
806 EXPECT_EQ(w1, w);
807
808 /**
809 * @tc.steps: step4. set peer and local watermark
810 * @tc.expected: step4. E_OK
811 */
812 WaterMark w2 = 2;
813 EXPECT_EQ(meta.SaveLocalWaterMark("D1", w2), E_OK);
814 EXPECT_EQ(meta.SavePeerWaterMark("D1", w2, true), E_OK);
815
816 /**
817 * @tc.steps: step5. get receive and send watermark
818 * @tc.expected: step5. E_OK and get the w1
819 */
820 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
821 EXPECT_EQ(w2, w);
822 EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
823 EXPECT_EQ(w2, w);
824
825 /**
826 * @tc.steps: step6. set peer and local watermark
827 * @tc.expected: step6. E_OK
828 */
829 WaterMark w3 = 3;
830 EXPECT_EQ(meta.SaveLocalWaterMark("D2", w3), E_OK);
831 EXPECT_EQ(meta.SavePeerWaterMark("D2", w3, true), E_OK);
832
833 /**
834 * @tc.steps: step7. get receive and send watermark
835 * @tc.expected: step7. E_OK and get the w3
836 */
837 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q2", "D2", w), E_OK);
838 EXPECT_EQ(w3, w);
839 EXPECT_EQ(meta.GetSendQueryWaterMark("Q2", "D2", w), E_OK);
840 EXPECT_EQ(w3, w);
841
842 /**
843 * @tc.steps: step8. get not exit receive and send watermark
844 * @tc.expected: step8. E_OK and get the 0
845 */
846 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q3", "D3", w), E_OK);
847 EXPECT_EQ(w, 0u);
848 EXPECT_EQ(meta.GetSendQueryWaterMark("Q3", "D3", w), E_OK);
849 EXPECT_EQ(w, 0u);
850 }
851
852 /**
853 * @tc.name: GetQueryWaterMark 002
854 * @tc.desc: Test metaData save and get queryWaterMark after push or pull mode.
855 * @tc.type: FUNC
856 * @tc.require: AR000FN6G9
857 * @tc.author: zhangqiquan
858 */
859 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark002, TestSize.Level1)
860 {
861 VirtualSingleVerSyncDBInterface storage;
862 Metadata meta;
863
864 /**
865 * @tc.steps: step1. initialize meta with storage
866 * @tc.expected: step1. E_OK
867 */
868 int errCode = meta.Initialize(&storage);
869 ASSERT_EQ(errCode, E_OK);
870
871 /**
872 * @tc.steps: step2. set peer and local watermark
873 * @tc.expected: step2. E_OK
874 */
875 WaterMark w1 = 2;
876 EXPECT_EQ(meta.SaveLocalWaterMark("D1", w1), E_OK);
877 EXPECT_EQ(meta.SavePeerWaterMark("D1", w1, true), E_OK);
878
879 /**
880 * @tc.steps: step2. save receive and send watermark
881 * @tc.expected: step2. E_OK
882 */
883 WaterMark w2 = 1;
884 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
885 EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", w2), E_OK);
886
887 /**
888 * @tc.steps: step3. get receive and send watermark
889 * @tc.expected: step3. E_OK and get the bigger value
890 */
891 WaterMark w = 0;
892 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
893 EXPECT_EQ(w1, w);
894 EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
895 EXPECT_EQ(w1, w);
896 }
897
898 /**
899 * @tc.name: GetQueryWaterMark 003
900 * @tc.desc: check time offset after remove water mark
901 * @tc.type: FUNC
902 * @tc.require:
903 * @tc.author: lianhuix
904 */
905 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark003, TestSize.Level1)
906 {
907 VirtualSingleVerSyncDBInterface storage;
908 Metadata meta;
909
910 int errCode = meta.Initialize(&storage);
911 ASSERT_EQ(errCode, E_OK);
912
913 const std::string DEVICE_B = "DEVICE_B";
914 TimeOffset offset = 100; // 100: offset
915 meta.SaveTimeOffset(DEVICE_B, offset);
916
917 WaterMark w1 = 2; // 2: watermark
918 meta.SavePeerWaterMark(DBCommon::TransferHashString(DEVICE_B), w1, false);
919
920 TimeOffset offsetGot;
921 meta.GetTimeOffset(DEVICE_B, offsetGot);
922 EXPECT_EQ(offsetGot, offset);
923 }
924
925 /**
926 * @tc.name: GetDeleteWaterMark001
927 * @tc.desc: Test metaData save and get deleteWaterMark.
928 * @tc.type: FUNC
929 * @tc.require: AR000FN6G9
930 * @tc.author: zhangqiquan
931 */
932 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteWaterMark001, TestSize.Level1)
933 {
934 VirtualSingleVerSyncDBInterface storage;
935 Metadata meta;
936
937 /**
938 * @tc.steps: step1. initialize meta with storage
939 * @tc.expected: step1. E_OK
940 */
941 int errCode = meta.Initialize(&storage);
942 ASSERT_EQ(errCode, E_OK);
943
944 /**
945 * @tc.steps: step2. set and get recv/send delete watermark
946 * @tc.expected: step2. set E_OK and get water mark is equal with last set
947 */
948 const std::string device = "DEVICE";
949 const WaterMark maxWaterMark = 1000u;
__anond092fe7f0202() 950 std::thread recvThread([&meta, &device, &maxWaterMark]() {
951 for (WaterMark expectRecv = 0u; expectRecv < maxWaterMark; ++expectRecv) {
952 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark(device, expectRecv), E_OK);
953 WaterMark actualRecv = 0u;
954 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark(device, actualRecv), E_OK);
955 EXPECT_EQ(actualRecv, expectRecv);
956 }
957 });
__anond092fe7f0302() 958 std::thread sendThread([&meta, &device, &maxWaterMark]() {
959 for (WaterMark expectSend = 0u; expectSend < maxWaterMark; ++expectSend) {
960 EXPECT_EQ(meta.SetSendDeleteSyncWaterMark(device, expectSend), E_OK);
961 WaterMark actualSend = 0u;
962 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark(device, actualSend), E_OK);
963 EXPECT_EQ(actualSend, expectSend);
964 }
965 });
966 recvThread.join();
967 sendThread.join();
968 }
969
970 /**
971 * @tc.name: ClearQueryWaterMark 001
972 * @tc.desc: Test metaData clear watermark function.
973 * @tc.type: FUNC
974 * @tc.require: AR000FN6G9
975 * @tc.author: zhangqiquan
976 */
977 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark001, TestSize.Level1)
978 {
979 VirtualSingleVerSyncDBInterface storage;
980 Metadata meta;
981
982 /**
983 * @tc.steps: step1. initialize meta with storage
984 * @tc.expected: step1. E_OK
985 */
986 int errCode = meta.Initialize(&storage);
987 ASSERT_EQ(errCode, E_OK);
988
989 /**
990 * @tc.steps: step2. save receive watermark
991 * @tc.expected: step2. E_OK
992 */
993 WaterMark w1 = 1;
994 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
995
996 /**
997 * @tc.steps: step3. erase peer watermark
998 * @tc.expected: step3. E_OK
999 */
1000 EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
1001
1002 /**
1003 * @tc.steps: step4. get receive watermark
1004 * @tc.expected: step4. E_OK receive watermark is zero
1005 */
1006 WaterMark w2 = -1;
1007 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
1008 EXPECT_EQ(w2, 0u);
1009
1010 /**
1011 * @tc.steps: step5. set peer watermark
1012 * @tc.expected: step5. E_OK
1013 */
1014 WaterMark w3 = 2;
1015 EXPECT_EQ(meta.SavePeerWaterMark("D1", w3, true), E_OK);
1016
1017 /**
1018 * @tc.steps: step6. get receive watermark
1019 * @tc.expected: step6. E_OK receive watermark is peer watermark
1020 */
1021 WaterMark w4 = -1;
1022 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w4), E_OK);
1023 EXPECT_EQ(w4, w3);
1024 }
1025
1026 /**
1027 * @tc.name: ClearQueryWaterMark 002
1028 * @tc.desc: Test metaData clear watermark function.
1029 * @tc.type: FUNC
1030 * @tc.require: AR000FN6G9
1031 * @tc.author: zhangqiquan
1032 */
1033 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark002, TestSize.Level1)
1034 {
1035 VirtualSingleVerSyncDBInterface storage;
1036 Metadata meta;
1037
1038 /**
1039 * @tc.steps: step1. initialize meta with storage
1040 * @tc.expected: step1. E_OK
1041 */
1042 int errCode = meta.Initialize(&storage);
1043 ASSERT_EQ(errCode, E_OK);
1044
1045 /**
1046 * @tc.steps: step2. save receive watermark
1047 * @tc.expected: step2. E_OK
1048 */
1049 WaterMark w1 = 1;
1050 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
1051 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q2", "D1", w1), E_OK);
1052 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D2", w1), E_OK);
1053
1054 /**
1055 * @tc.steps: step3. erase peer watermark, make sure data remove in db
1056 * @tc.expected: step3. E_OK
1057 */
1058 Metadata anotherMeta;
1059 ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1060 EXPECT_EQ(anotherMeta.EraseDeviceWaterMark("D1", true), E_OK);
1061
1062 /**
1063 * @tc.steps: step4. get receive watermark
1064 * @tc.expected: step4. E_OK receive watermark is zero
1065 */
1066 WaterMark w2 = -1;
1067 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
1068 EXPECT_EQ(w2, 0u);
1069 w2 = -1;
1070 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q2", "D1", w2), E_OK);
1071 EXPECT_EQ(w2, 0u);
1072 w2 = -1;
1073 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D2", w2), E_OK);
1074 EXPECT_EQ(w2, w1);
1075 }
1076
1077 /**
1078 * @tc.name: ClearQueryWaterMark 003
1079 * @tc.desc: Test metaData clear watermark busy.
1080 * @tc.type: FUNC
1081 * @tc.require: AR000FN6G9
1082 * @tc.author: zhangqiquan
1083 */
1084 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark003, TestSize.Level1)
1085 {
1086 VirtualSingleVerSyncDBInterface storage;
1087 Metadata meta;
1088 /**
1089 * @tc.steps: step1. initialize meta with storage
1090 * @tc.expected: step1. E_OK
1091 */
1092 int errCode = meta.Initialize(&storage);
1093 ASSERT_EQ(errCode, E_OK);
1094 /**
1095 * @tc.steps: step2. set busy and erase water mark
1096 * @tc.expected: step2. -E_BUSY
1097 */
1098 storage.SetBusy(false, true);
1099 EXPECT_EQ(meta.EraseDeviceWaterMark("DEVICE_ID", true), -E_BUSY);
1100 }
1101
1102 /**
1103 * @tc.name: GetQueryLastTimestamp001
1104 * @tc.desc: Test function of GetQueryLastTimestamp.
1105 * @tc.type: FUNC
1106 * @tc.require: AR000FN6G9
1107 * @tc.author: zhangshijie
1108 */
1109 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryLastTimestamp001, TestSize.Level1)
1110 {
1111 /**
1112 * @tc.steps: step1. initialize meta with nullptr
1113 * @tc.expected: step1. return -E_INVALID_DB
1114 */
1115 Metadata meta;
1116 EXPECT_EQ(meta.Initialize(nullptr), -E_INVALID_DB);
1117
1118 /**
1119 * @tc.steps: step2. initialize meta with storage
1120 * @tc.expected: step2. E_OK
1121 */
1122 VirtualSingleVerSyncDBInterface storage;
1123 int errCode = meta.Initialize(&storage);
1124 ASSERT_EQ(errCode, E_OK);
1125
1126 /**
1127 * @tc.steps: step3. call GetQueryLastTimestamp with a non-exists device
1128 * @tc.expected: step3. return INT64_MAX
1129 */
1130 EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), static_cast<uint64_t>(INT64_MAX));
1131
1132 /**
1133 * @tc.steps: step4. call GetQueryLastTimestamp with device D1 again
1134 * @tc.expected: step4. return 0
1135 */
1136 EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), 0u);
1137
1138 /**
1139 * @tc.steps: step5. call GetQueryLastTimestamp with device D1 and Q2
1140 * @tc.expected: step5. return INT64_MAX
1141 */
1142 EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q2"), static_cast<uint64_t>(INT64_MAX));
1143 }
1144
1145 /**
1146 * @tc.name: GetQueryLastTimestamp002
1147 * @tc.desc: Test Metadata::GetQueryLastTimestamp when timestamp out of INT64 range.
1148 * @tc.type: FUNC
1149 * @tc.author: liuhongyang
1150 */
1151 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryLastTimestamp002, TestSize.Level1)
1152 {
1153 /**
1154 * @tc.steps: step1. initialize storage with fake meta data
1155 * @tc.expected: step1. E_OK
1156 */
1157 const int64_t validInt64Num1 = 3000000000000000000; // a random valid int64 number
1158 const int64_t validInt64Num2 = 3000000000000000001; // a random valid int64 number
1159 // key is queryId, value is: [stored timestamp in db, expect return value of GetQueryLastTimestamp]
1160 std::map<std::string, std::pair<std::string, int64_t>> idValueMap = {
1161 {"regular1", {"3000000000000000000", validInt64Num1}},
1162 {"max", {"9223372036854775807", INT64_MAX}},
1163 {"min", {"-9223372036854775808", INT64_MIN}},
1164 {"overMax", {"9223372036854775808", INT64_MAX}},
1165 {"underMin", {"-9223372036854775809", INT64_MIN}},
1166 {"regular2", {"3000000000000000001", validInt64Num2}}};
1167 Key metaKey;
1168 Value value;
1169 VirtualSingleVerSyncDBInterface storage;
1170 for (auto &pair : idValueMap) {
1171 std::string keyStr = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(pair.first);
1172 DBCommon::StringToVector(keyStr, metaKey);
1173 DBCommon::StringToVector(pair.second.first, value);
1174 EXPECT_EQ(storage.PutMetaData(metaKey, value, false), E_OK);
1175 }
1176 /**
1177 * @tc.steps: step2. call GetQueryLastTimestamp with different query id
1178 * @tc.expected: step2. get the correct return value
1179 */
1180 Metadata meta;
1181 int errCode = meta.Initialize(&storage);
1182 ASSERT_EQ(errCode, E_OK);
1183 for (auto &pair : idValueMap) {
1184 auto &queryId = pair.first;
1185 auto &expectVal = pair.second.second;
1186 EXPECT_EQ(meta.GetQueryLastTimestamp("any", queryId), static_cast<uint64_t>(expectVal));
1187 }
1188 }
1189
1190 /**
1191 * @tc.name: MetaDataExceptionBranch001
1192 * @tc.desc: Test execption branch of meata data.
1193 * @tc.type: FUNC
1194 * @tc.require: AR000FN6G9
1195 * @tc.author: zhangshijie
1196 */
1197 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, MetaDataExceptionBranch001, TestSize.Level1)
1198 {
1199 /**
1200 * @tc.steps: step1. call GetRemoveDataMark with a device not in map
1201 * @tc.expected: step1. out value = 0
1202 */
1203 Metadata meta;
1204 VirtualSingleVerSyncDBInterface storage;
1205 int errCode = meta.Initialize(&storage);
1206 ASSERT_EQ(errCode, E_OK);
1207
1208 uint64_t val = 99; // 99 is the initial value of outValue
1209 uint64_t outValue = val;
1210 meta.GetRemoveDataMark("D1", outValue);
1211 EXPECT_EQ(outValue, 0u);
1212
1213 /**
1214 * @tc.steps: step2. reset outValue, call GetDbCreateTime with a device not in map
1215 * @tc.expected: step2. out value = 0
1216 */
1217 outValue = val;
1218 meta.GetDbCreateTime("D1", outValue);
1219 EXPECT_EQ(outValue, 0u);
1220
1221 /**
1222 * @tc.steps: step3. call ResetMetaDataAfterRemoveData with a device not in map
1223 * @tc.expected: step3. return -E_NOT_FOUND
1224 */
1225 EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1"), -E_NOT_FOUND);
1226 }
1227
1228 /**
1229 * @tc.name: GetDeleteKeyWaterMark 001
1230 * @tc.desc: Test metaData save and get deleteWaterMark.
1231 * @tc.type: FUNC
1232 * @tc.require: AR000FN6G9
1233 * @tc.author: zhangqiquan
1234 */
1235 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark001, TestSize.Level1)
1236 {
1237 VirtualSingleVerSyncDBInterface storage;
1238 Metadata meta;
1239
1240 /**
1241 * @tc.steps: step1. initialize meta with storage
1242 * @tc.expected: step1. E_OK
1243 */
1244 int errCode = meta.Initialize(&storage);
1245 ASSERT_EQ(errCode, E_OK);
1246
1247 /**
1248 * @tc.steps: step2. save receive and send watermark
1249 * @tc.expected: step2. E_OK
1250 */
1251 WaterMark w1 = 1;
1252 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w1), E_OK);
1253 EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", w1), E_OK);
1254
1255 /**
1256 * @tc.steps: step3. get receive and send watermark
1257 * @tc.expected: step3. E_OK and get the latest value
1258 */
1259 WaterMark w = 0;
1260 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1261 EXPECT_EQ(w1, w);
1262 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1263 EXPECT_EQ(w1, w);
1264
1265 /**
1266 * @tc.steps: step4. set peer and local watermark
1267 * @tc.expected: step4. E_OK
1268 */
1269 WaterMark w2 = 2;
1270 EXPECT_EQ(meta.SaveLocalWaterMark("D1", w2), E_OK);
1271 EXPECT_EQ(meta.SavePeerWaterMark("D1", w2, true), E_OK);
1272
1273 /**
1274 * @tc.steps: step5. get receive and send watermark
1275 * @tc.expected: step5. E_OK and get the w1
1276 */
1277 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1278 EXPECT_EQ(w2, w);
1279 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1280 EXPECT_EQ(w2, w);
1281
1282 /**
1283 * @tc.steps: step6. set peer and local watermark
1284 * @tc.expected: step6. E_OK
1285 */
1286 WaterMark w3 = 3;
1287 EXPECT_EQ(meta.SaveLocalWaterMark("D2", w3), E_OK);
1288 EXPECT_EQ(meta.SavePeerWaterMark("D2", w3, true), E_OK);
1289
1290 /**
1291 * @tc.steps: step7. get receive and send watermark
1292 * @tc.expected: step7. E_OK and get the w3
1293 */
1294 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D2", w), E_OK);
1295 EXPECT_EQ(w3, w);
1296 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D2", w), E_OK);
1297 EXPECT_EQ(w3, w);
1298
1299 /**
1300 * @tc.steps: step8. get not exit receive and send watermark
1301 * @tc.expected: step8. E_OK and get the 0
1302 */
1303 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D3", w), E_OK);
1304 EXPECT_EQ(w, 0u);
1305 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D3", w), E_OK);
1306 EXPECT_EQ(w, 0u);
1307 }
1308
1309 /**
1310 * @tc.name: GetDeleteKeyWaterMark 002
1311 * @tc.desc: Test metaData save and get deleteWaterMark after push or pull mode.
1312 * @tc.type: FUNC
1313 * @tc.require: AR000FN6G9
1314 * @tc.author: zhangqiquan
1315 */
1316 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark002, TestSize.Level1)
1317 {
1318 VirtualSingleVerSyncDBInterface storage;
1319 Metadata meta;
1320
1321 /**
1322 * @tc.steps: step1. initialize meta with storage
1323 * @tc.expected: step1. E_OK
1324 */
1325 int errCode = meta.Initialize(&storage);
1326 ASSERT_EQ(errCode, E_OK);
1327
1328 /**
1329 * @tc.steps: step2. set peer and local watermark
1330 * @tc.expected: step2. E_OK
1331 */
1332 WaterMark w1 = 3;
1333 EXPECT_EQ(meta.SaveLocalWaterMark("D1", w1), E_OK);
1334 EXPECT_EQ(meta.SavePeerWaterMark("D1", w1, true), E_OK);
1335
1336 /**
1337 * @tc.steps: step2. save receive and send watermark
1338 * @tc.expected: step2. E_OK
1339 */
1340 WaterMark w2 = 1;
1341 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w2), E_OK);
1342 EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", w2), E_OK);
1343
1344 /**
1345 * @tc.steps: step3. get receive and send watermark
1346 * @tc.expected: step3. E_OK and get the bigger value
1347 */
1348 WaterMark w = 0;
1349 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1350 EXPECT_EQ(w1, w);
1351 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1352 EXPECT_EQ(w1, w);
1353 }
1354
1355 /**
1356 * @tc.name: ClearDeleteKeyWaterMark 001
1357 * @tc.desc: Test metaData clear watermark function.
1358 * @tc.type: FUNC
1359 * @tc.require: AR000FN6G9
1360 * @tc.author: zhangqiquan
1361 */
1362 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearDeleteKeyWaterMark001, TestSize.Level1)
1363 {
1364 VirtualSingleVerSyncDBInterface storage;
1365 Metadata meta;
1366
1367 /**
1368 * @tc.steps: step1. initialize meta with storage
1369 * @tc.expected: step1. E_OK
1370 */
1371 int errCode = meta.Initialize(&storage);
1372 ASSERT_EQ(errCode, E_OK);
1373
1374 /**
1375 * @tc.steps: step2. save receive watermark
1376 * @tc.expected: step2. E_OK
1377 */
1378 WaterMark w1 = 1;
1379 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w1), E_OK);
1380
1381 /**
1382 * @tc.steps: step3. erase peer watermark
1383 * @tc.expected: step3. E_OK
1384 */
1385 EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
1386
1387 /**
1388 * @tc.steps: step4. get receive watermark
1389 * @tc.expected: step4. E_OK receive watermark is zero
1390 */
1391 WaterMark w2 = -1;
1392 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w2), E_OK);
1393 EXPECT_EQ(w2, 0u);
1394
1395 /**
1396 * @tc.steps: step5. set peer watermark
1397 * @tc.expected: step5. E_OK
1398 */
1399 WaterMark w3 = 2;
1400 EXPECT_EQ(meta.SavePeerWaterMark("D1", w3, true), E_OK);
1401
1402 /**
1403 * @tc.steps: step6. get receive watermark
1404 * @tc.expected: step6. E_OK receive watermark is peer watermark
1405 */
1406 WaterMark w4 = -1;
1407 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w4), E_OK);
1408 EXPECT_EQ(w4, w3);
1409 }
1410
1411 /**
1412 * @tc.name: VerifyCacheAndDb 001
1413 * @tc.desc: Test metaData watermark cache and db are consistent and correct.
1414 * @tc.type: FUNC
1415 * @tc.require: AR000FN6G9
1416 * @tc.author: zhangqiquan
1417 */
1418 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataQuerySync001, TestSize.Level1)
1419 {
1420 Metadata meta;
1421 VirtualSingleVerSyncDBInterface storage;
1422
1423 /**
1424 * @tc.steps: step1. initialize meta with storage
1425 * @tc.expected: step1. E_OK
1426 */
1427 int errCode = meta.Initialize(&storage);
1428 ASSERT_EQ(errCode, E_OK);
1429
1430 const std::string deviceId = "D1";
1431 const std::string queryId = "Q1";
1432
1433 /**
1434 * @tc.steps: step2. save deleteSync watermark
1435 * @tc.expected: step2. E_OK
1436 */
1437 WaterMark deleteWaterMark = 1;
1438 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark(deviceId, deleteWaterMark), E_OK);
1439 EXPECT_EQ(meta.SetSendDeleteSyncWaterMark(deviceId, deleteWaterMark), E_OK);
1440
1441 /**
1442 * @tc.steps: step3. save querySync watermark
1443 * @tc.expected: step2. E_OK
1444 */
1445 WaterMark queryWaterMark = 2;
1446 EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, queryWaterMark), E_OK);
1447 EXPECT_EQ(meta.SetSendQueryWaterMark(queryId, deviceId, queryWaterMark), E_OK);
1448
1449 /**
1450 * @tc.steps: step4. initialize meta with storage
1451 * @tc.expected: step4. E_OK
1452 */
1453 Metadata anotherMeta;
1454 ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1455
1456 /**
1457 * @tc.steps: step5. verify delete sync data
1458 * @tc.expected: step5. E_OK and waterMark equal to deleteWaterMark
1459 */
1460 WaterMark waterMark;
1461 EXPECT_EQ(anotherMeta.GetRecvDeleteSyncWaterMark(deviceId, waterMark), E_OK);
1462 EXPECT_EQ(waterMark, deleteWaterMark);
1463 EXPECT_EQ(anotherMeta.GetSendDeleteSyncWaterMark(deviceId, waterMark), E_OK);
1464 EXPECT_EQ(waterMark, deleteWaterMark);
1465
1466 /**
1467 * @tc.steps: step6. verify query sync data
1468 * @tc.expected: step6. E_OK and waterMark equal to queryWaterMark
1469 */
1470 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark(queryId, deviceId, waterMark), E_OK);
1471 EXPECT_EQ(waterMark, queryWaterMark);
1472 EXPECT_EQ(anotherMeta.GetSendQueryWaterMark(queryId, deviceId, waterMark), E_OK);
1473 EXPECT_EQ(waterMark, queryWaterMark);
1474 }
1475
1476 /**
1477 * @tc.name: VerifyLruMap 001
1478 * @tc.desc: Test metaData watermark cache lru ability.
1479 * @tc.type: FUNC
1480 * @tc.require: AR000FN6G9
1481 * @tc.author: zhangqiquan
1482 */
1483 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyLruMap001, TestSize.Level1)
1484 {
1485 LruMap<std::string, QueryWaterMark> lruMap;
1486 const int maxCacheItems = 200;
1487
1488 /**
1489 * @tc.steps: step1. fill items to LruMap
1490 * @tc.expected: step1. E_OK
1491 */
1492 const int startCount = 0;
1493 for (int i = startCount; i < maxCacheItems; i++) {
1494 std::string key = std::to_string(i);
1495 QueryWaterMark value;
1496 value.recvWaterMark = static_cast<uint64_t>(i + 1);
1497 EXPECT_EQ(lruMap.Put(key, value), E_OK);
1498 }
1499
1500 /**
1501 * @tc.steps: step2. get the first item
1502 * @tc.expected: step2. E_OK first item will move to last
1503 */
1504 std::string firstItemKey = std::to_string(startCount);
1505 QueryWaterMark firstItemValue;
1506 EXPECT_EQ(lruMap.Get(firstItemKey, firstItemValue), E_OK);
1507 EXPECT_EQ(firstItemValue.recvWaterMark, 1u);
1508
1509 /**
1510 * @tc.steps: step3. insert new items to LruMap
1511 * @tc.expected: step3. the second items was removed
1512 */
1513 std::string key = std::to_string(maxCacheItems);
1514 QueryWaterMark value;
1515 value.recvWaterMark = maxCacheItems;
1516 EXPECT_EQ(lruMap.Put(key, value), E_OK);
1517
1518 /**
1519 * @tc.steps: step4. get the second item
1520 * @tc.expected: step4. E_NOT_FOUND it was removed by algorithm
1521 */
1522 std::string secondItemKey = std::to_string(startCount + 1);
1523 QueryWaterMark secondItemValue;
1524 EXPECT_EQ(lruMap.Get(secondItemKey, secondItemValue), -E_NOT_FOUND);
1525 EXPECT_EQ(secondItemValue.recvWaterMark, 0u);
1526 }
1527
1528 /**
1529 * @tc.name: VerifyMetaDataInit 001
1530 * @tc.desc: Test metaData init correctly
1531 * @tc.type: FUNC
1532 * @tc.require: AR000FN6G9
1533 * @tc.author: zhangqiquan
1534 */
1535 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataInit001, TestSize.Level1)
1536 {
1537 Metadata meta;
1538 VirtualSingleVerSyncDBInterface storage;
1539
1540 /**
1541 * @tc.steps: step1. initialize meta with storage
1542 * @tc.expected: step1. E_OK
1543 */
1544 ASSERT_EQ(meta.Initialize(&storage), E_OK);
1545
1546 DeviceID deviceA = "DeviceA";
1547 DeviceID deviceB = "DeviceA";
1548 WaterMark setWaterMark = 1;
1549
1550 /**
1551 * @tc.steps: step2. meta save and get waterMark
1552 * @tc.expected: step2. expect get the same waterMark
1553 */
1554 EXPECT_EQ(meta.SaveLocalWaterMark(deviceA, setWaterMark), E_OK);
1555 EXPECT_EQ(meta.SaveLocalWaterMark(deviceB, setWaterMark), E_OK);
1556 WaterMark getWaterMark = 0;
1557 meta.GetLocalWaterMark(deviceA, getWaterMark);
1558 EXPECT_EQ(getWaterMark, setWaterMark);
1559 meta.GetLocalWaterMark(deviceB, getWaterMark);
1560 EXPECT_EQ(getWaterMark, setWaterMark);
1561
1562
1563 /**
1564 * @tc.steps: step3. init again
1565 * @tc.expected: step3. E_OK
1566 */
1567 Metadata anotherMeta;
1568 ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1569
1570 /**
1571 * @tc.steps: step4. get waterMark again
1572 * @tc.expected: step4. expect get the same waterMark
1573 */
1574 anotherMeta.GetLocalWaterMark(deviceA, getWaterMark);
1575 EXPECT_EQ(getWaterMark, setWaterMark);
1576 anotherMeta.GetLocalWaterMark(deviceB, getWaterMark);
1577 EXPECT_EQ(getWaterMark, setWaterMark);
1578 }
1579
1580 namespace {
InitVerifyStorageEnvironment(Metadata & meta,VirtualSingleVerSyncDBInterface & storage,const std::string & deviceId,const int & startCount,const uint32_t & maxStoreItems)1581 void InitVerifyStorageEnvironment(Metadata &meta, VirtualSingleVerSyncDBInterface &storage,
1582 const std::string &deviceId, const int &startCount, const uint32_t &maxStoreItems)
1583 {
1584 /**
1585 * @tc.steps: step1. initialize meta with storage
1586 * @tc.expected: step1. E_OK
1587 */
1588 ASSERT_EQ(meta.Initialize(&storage), E_OK);
1589
1590 /**
1591 * @tc.steps: step2. fill items to metadata
1592 * @tc.expected: step2. E_OK
1593 */
1594 for (uint32_t i = startCount; i < maxStoreItems; i++) {
1595 std::string queryId = std::to_string(i);
1596 WaterMark recvWaterMark = i + 1;
1597 EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, recvWaterMark), E_OK);
1598 }
1599 }
1600 }
1601
1602 /**
1603 * @tc.name: VerifyManagerQuerySyncStorage 001
1604 * @tc.desc: Test metaData remove least used querySync storage items.
1605 * @tc.type: FUNC
1606 * @tc.require: AR000FN6G9
1607 * @tc.author: zhangqiquan
1608 */
1609 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage001, TestSize.Level3)
1610 {
1611 Metadata meta;
1612 VirtualSingleVerSyncDBInterface storage;
1613 const uint32_t maxStoreItems = 100000;
1614 const int startCount = 0;
1615 const std::string deviceId = "Device";
1616
1617 InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1618
1619 /**
1620 * @tc.steps: step3. insert new items to metadata
1621 * @tc.expected: step3. E_OK
1622 */
1623 std::string newQueryId = std::to_string(maxStoreItems);
1624 WaterMark newWaterMark = maxStoreItems + 1;
1625 EXPECT_EQ(meta.SetRecvQueryWaterMark(newQueryId, deviceId, newWaterMark), E_OK);
1626
1627 /**
1628 * @tc.steps: step4. touch the first item
1629 * @tc.expected: step4. E_OK update first item used time
1630 */
1631 std::string firstItemKey = std::to_string(startCount);
1632 WaterMark firstWaterMark = 11u;
1633 EXPECT_EQ(meta.SetRecvQueryWaterMark(firstItemKey, deviceId, firstWaterMark), E_OK);
1634
1635 /**
1636 * @tc.steps: step5. initialize new meta with storage
1637 * @tc.expected: step5. the second item will be removed
1638 */
1639 Metadata newMeta;
1640 ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1641
1642 /**
1643 * @tc.steps: step6. touch the first item
1644 * @tc.expected: step6. E_OK it still exist
1645 */
1646 WaterMark exceptWaterMark;
1647 EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, exceptWaterMark), E_OK);
1648 EXPECT_EQ(exceptWaterMark, firstWaterMark);
1649
1650 /**
1651 * @tc.steps: step7. get the second item
1652 * @tc.expected: step7. NOT_FOUND secondWaterMark is zero
1653 */
1654 WaterMark secondWaterMark;
1655 std::string secondQueryId = std::to_string(startCount + 1);
1656 EXPECT_EQ(newMeta.GetRecvQueryWaterMark(secondQueryId, deviceId, secondWaterMark), E_OK);
1657 EXPECT_EQ(secondWaterMark, 0u);
1658 }
1659
1660 /**
1661 * @tc.name: VerifyMetaDbCreateTime 001
1662 * @tc.desc: Test metaData get and set cbCreateTime.
1663 * @tc.type: FUNC
1664 * @tc.require: AR000FN6G9
1665 * @tc.author: zhuwentao
1666 */
1667 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDbCreateTime001, TestSize.Level1)
1668 {
1669 Metadata meta;
1670 VirtualSingleVerSyncDBInterface storage;
1671 /**
1672 * @tc.steps: step1. initialize meta with storage
1673 * @tc.expected: step1. E_OK
1674 */
1675 int errCode = meta.Initialize(&storage);
1676 ASSERT_EQ(errCode, E_OK);
1677 /**
1678 * @tc.steps: step2. set local and peer watermark and dbCreateTime
1679 * @tc.expected: step4. E_OK
1680 */
1681 WaterMark value = 2;
1682 EXPECT_EQ(meta.SaveLocalWaterMark("D1", value), E_OK);
1683 EXPECT_EQ(meta.SavePeerWaterMark("D1", value, true), E_OK);
1684 EXPECT_EQ(meta.SetDbCreateTime("D1", 10u, true), E_OK);
1685 /**
1686 * @tc.steps: step3. check peer and local watermark and dbCreateTime
1687 * @tc.expected: step4. E_OK
1688 */
1689 WaterMark curValue = 0;
1690 meta.GetLocalWaterMark("D1", curValue);
1691 EXPECT_EQ(value, curValue);
1692 meta.GetPeerWaterMark("D1", curValue);
1693 EXPECT_EQ(value, curValue);
1694 uint64_t curDbCreatTime = 0;
1695 meta.GetDbCreateTime("D1", curDbCreatTime);
1696 EXPECT_EQ(curDbCreatTime, 10u);
1697 /**
1698 * @tc.steps: step3. change dbCreateTime and check
1699 * @tc.expected: step4. E_OK
1700 */
1701 EXPECT_EQ(meta.SetDbCreateTime("D1", 20u, true), E_OK);
1702 uint64_t clearDeviceDataMark = INT_MAX;
1703 meta.GetRemoveDataMark("D1", clearDeviceDataMark);
1704 EXPECT_EQ(clearDeviceDataMark, 1u);
1705 EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1"), E_OK);
1706 meta.GetRemoveDataMark("D1", clearDeviceDataMark);
1707 EXPECT_EQ(clearDeviceDataMark, 0u);
1708 meta.GetDbCreateTime("D1", curDbCreatTime);
1709 EXPECT_EQ(curDbCreatTime, 20u);
1710 }
1711
1712 /**
1713 * @tc.name: VerifyManagerQuerySyncStorage 002
1714 * @tc.desc: Test metaData remove least used querySync storage items when exit wrong data.
1715 * @tc.type: FUNC
1716 * @tc.require: AR000FN6G9
1717 * @tc.author: zhangqiquan
1718 */
1719 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage002, TestSize.Level3)
1720 {
1721 Metadata meta;
1722 VirtualSingleVerSyncDBInterface storage;
1723 const uint32_t maxStoreItems = 100000;
1724 const int startCount = 0;
1725 const std::string deviceId = "Device";
1726
1727 InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1728
1729 /**
1730 * @tc.steps: step3. insert a wrong Value
1731 * @tc.expected: step3. E_OK
1732 */
1733 std::string newQueryId = std::to_string(maxStoreItems);
1734 Key dbKey;
1735 DBCommon::StringToVector(QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
1736 + DBCommon::TransferHashString(deviceId) + newQueryId, dbKey);
1737 Value wrongValue;
1738 EXPECT_EQ(storage.PutMetaData(dbKey, wrongValue, false), E_OK);
1739
1740 /**
1741 * @tc.steps: step4. initialize new meta with storage
1742 * @tc.expected: step4. E_OK
1743 */
1744 Metadata newMeta;
1745 ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1746
1747 /**
1748 * @tc.steps: step5. touch the first item
1749 * @tc.expected: step5. E_OK still exit
1750 */
1751 std::string firstItemKey = std::to_string(startCount);
1752 WaterMark exceptWaterMark;
1753 EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, exceptWaterMark), E_OK);
1754 EXPECT_EQ(exceptWaterMark, 1u);
1755 }
1756
1757 /**
1758 * @tc.name: AllPredicateQuerySync001
1759 * @tc.desc: Test normal push sync for AllPredicate data.
1760 * @tc.type: FUNC
1761 * @tc.require: AR000FN6G9
1762 * @tc.author: zhuwentao
1763 */
1764 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync001, TestSize.Level1)
1765 {
1766 /**
1767 * @tc.steps: step1. InitSchemaDb
1768 */
1769 InitSchemaDb();
1770 DBStatus status = OK;
1771 std::vector<std::string> devices;
1772 devices.push_back(g_deviceB->GetDeviceId());
1773
1774 /**
1775 * @tc.steps: step2. deviceA put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1776 {key21, SCHEMA_VALUE2} - {key29, SCHEMA_VALUE2}
1777 */
1778 Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1779 Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1780 Key key = {'1'};
1781 Key key2 = {'2'};
1782 const int dataSize = 4000;
1783 for (int i = 0; i < dataSize; i++) {
1784 key.push_back(i);
1785 key2.push_back(i);
1786 status = g_schemaKvDelegatePtr->Put(key, value);
1787 ASSERT_TRUE(status == OK);
1788 status = g_schemaKvDelegatePtr->Put(key2, value2);
1789 ASSERT_TRUE(status == OK);
1790 key.pop_back();
1791 key2.pop_back();
1792 }
1793 ASSERT_TRUE(status == OK);
1794
1795 /**
1796 * @tc.steps: step3. deviceA call query sync and wait
1797 * @tc.expected: step3. sync should return OK.
1798 */
1799 Query query = Query::Select().EqualTo("$.field_name1", 1);
1800 std::map<std::string, DBStatus> result;
1801 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1802 ASSERT_TRUE(status == OK);
1803
1804 /**
1805 * @tc.expected: step4. onComplete should be called, DeviceB have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1806 */
1807 ASSERT_TRUE(result.size() == devices.size());
1808 for (const auto &pair : result) {
1809 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1810 EXPECT_TRUE(pair.second == OK);
1811 }
1812 VirtualDataItem item;
1813 VirtualDataItem item2;
1814 for (int i = 0; i < dataSize; i++) {
1815 key.push_back(i);
1816 key2.push_back(i);
1817 g_deviceB->GetData(key, item);
1818 EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1819 EXPECT_TRUE(item.value == value);
1820 key.pop_back();
1821 key2.pop_back();
1822 }
1823 }
1824
1825 /**
1826 * @tc.name: AllPredicateQuerySync002
1827 * @tc.desc: Test wrong query param push sync for AllPredicate data.
1828 * @tc.type: FUNC
1829 * @tc.require: AR000FN6G9
1830 * @tc.author: zhuwentao
1831 */
1832 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync002, TestSize.Level1)
1833 {
1834 /**
1835 * @tc.steps: step1. InitSchemaDb
1836 */
1837 InitSchemaDb();
1838 DBStatus status = OK;
1839 std::vector<std::string> devices;
1840 devices.push_back(g_deviceB->GetDeviceId());
1841
1842 /**
1843 * @tc.steps: step2. deviceA call query sync and wait
1844 * @tc.expected: step2. sync should return INVALID_QUERY_FIELD
1845 */
1846 Query query = Query::Select().GreaterThan("field_name11", 10);
1847 std::map<std::string, DBStatus> result;
1848 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1849 ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1850 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
1851 ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1852 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1853 ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1854 }
1855
1856 /**
1857 * @tc.name: AllPredicateQuerySync003
1858 * @tc.desc: Test normal push sync for AllPredicate data with limit
1859 * @tc.type: FUNC
1860 * @tc.require: AR000FN6G9
1861 * @tc.author: zhuwentao
1862 */
1863 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync003, TestSize.Level1)
1864 {
1865 /**
1866 * @tc.steps: step1. InitSchemaDb
1867 */
1868 InitSchemaDb();
1869 DBStatus status = OK;
1870 std::vector<std::string> devices;
1871 devices.push_back(g_deviceB->GetDeviceId());
1872
1873 /**
1874 * @tc.steps: step2. deviceA put {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1875 */
1876 Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1877 Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1878 Key key = {'1'};
1879 Key key2 = {'2'};
1880 const int dataSize = 10;
1881 for (int i = 0; i < dataSize; i++) {
1882 key.push_back(i);
1883 key2.push_back(i);
1884 status = g_schemaKvDelegatePtr->Put(key, value);
1885 ASSERT_TRUE(status == OK);
1886 status = g_schemaKvDelegatePtr->Put(key2, value2);
1887 ASSERT_TRUE(status == OK);
1888 key.pop_back();
1889 key2.pop_back();
1890 }
1891 ASSERT_TRUE(status == OK);
1892
1893 /**
1894 * @tc.steps: step3. deviceA call query sync with limit and wait
1895 * @tc.expected: step3. sync should return OK.
1896 */
1897 Query query = Query::Select().EqualTo("$.field_name1", 1).Limit(20, 0);
1898 std::map<std::string, DBStatus> result;
1899 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1900 ASSERT_TRUE(status == OK);
1901
1902 /**
1903 * @tc.expected: step4. onComplete should be called, DeviceB have {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1904 */
1905 ASSERT_TRUE(result.size() == devices.size());
1906 for (const auto &pair : result) {
1907 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1908 EXPECT_TRUE(pair.second == OK);
1909 }
1910 VirtualDataItem item;
1911 VirtualDataItem item2;
1912 for (int i = 0; i < dataSize; i++) {
1913 key.push_back(i);
1914 key2.push_back(i);
1915 g_deviceB->GetData(key, item);
1916 EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1917 EXPECT_TRUE(item.value == value);
1918 key.pop_back();
1919 key2.pop_back();
1920 }
1921 }
1922
1923 /**
1924 * @tc.name: AllPredicateQuerySync004
1925 * @tc.desc: Test normal pull sync for AllPredicate data.
1926 * @tc.type: FUNC
1927 * @tc.require: AR000FN6G9
1928 * @tc.author: zhuwentao
1929 */
1930 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync004, TestSize.Level1)
1931 {
1932 /**
1933 * @tc.steps: step1. InitSchemaDb
1934 */
1935 InitSchemaDb();
1936 DBStatus status = OK;
1937 std::vector<std::string> devices;
1938 devices.push_back(g_deviceB->GetDeviceId());
1939
1940 /**
1941 * @tc.steps: step2. deviceB put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1942 */
1943 Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1944 Key key = {'1'};
1945 const int dataSize = 10;
1946 for (int i = 0; i < dataSize; i++) {
1947 key.push_back(i);
1948 g_deviceB->PutData(key, value, 10 + i, 0);
1949 ASSERT_TRUE(status == OK);
1950 key.pop_back();
1951 }
1952 ASSERT_TRUE(status == OK);
1953
1954 /**
1955 * @tc.steps: step3. deviceA call query sync and wait
1956 * @tc.expected: step3. sync should return OK.
1957 */
1958 Query query = Query::Select().EqualTo("$.field_name1", 1);
1959 std::map<std::string, DBStatus> result;
1960 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1961 ASSERT_TRUE(status == OK);
1962
1963 /**
1964 * @tc.expected: step4. onComplete should be called, DeviceA have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1965 */
1966 ASSERT_TRUE(result.size() == devices.size());
1967 for (const auto &pair : result) {
1968 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1969 EXPECT_TRUE(pair.second == OK);
1970 }
1971 Value item;
1972 Value item2;
1973 for (int i = 0; i < dataSize; i++) {
1974 key.push_back(i);
1975 g_schemaKvDelegatePtr->Get(key, item);
1976 EXPECT_TRUE(item == value);
1977 key.pop_back();
1978 }
1979 }