1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <thread>
18
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "generic_single_ver_kv_entry.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "platform_specific.h"
27 #include "query.h"
28 #include "query_sync_object.h"
29 #include "single_ver_data_sync.h"
30 #include "single_ver_serialize_manager.h"
31 #include "sync_types.h"
32 #include "virtual_communicator.h"
33 #include "virtual_communicator_aggregator.h"
34 #include "virtual_single_ver_sync_db_Interface.h"
35
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40
41 namespace {
42 string g_testDir;
43 const string STORE_ID = "kv_store_sync_test";
44 const string SCHEMA_STORE_ID = "kv_store_sync_schema_test";
45 const std::string DEVICE_B = "deviceB";
46
47 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
48 KvStoreDelegateManager g_schemaMgr(SCHEMA_APP_ID, USER_ID);
49 KvStoreConfig g_config;
50 DistributedDBToolsUnitTest g_tool;
51 DBStatus g_kvDelegateStatus = INVALID_ARGS;
52 DBStatus g_schemaKvDelegateStatus = INVALID_ARGS;
53 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
54 KvStoreNbDelegate* g_schemaKvDelegatePtr = nullptr;
55 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
56 KvVirtualDevice *g_deviceB = nullptr;
57
58 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
59 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
60 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
61 auto g_schemaKvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
62 placeholders::_1, placeholders::_2, std::ref(g_schemaKvDelegateStatus), std::ref(g_schemaKvDelegatePtr));
63 const string SCHEMA_STRING =
64 "{\"SCHEMA_VERSION\":\"1.0\","
65 "\"SCHEMA_MODE\":\"STRICT\","
66 "\"SCHEMA_DEFINE\":{"
67 "\"field_name1\":\"BOOL\","
68 "\"field_name2\":\"BOOL\","
69 "\"field_name3\":\"INTEGER, NOT NULL\","
70 "\"field_name4\":\"LONG, DEFAULT 100\","
71 "\"field_name5\":\"DOUBLE, NOT NULL, DEFAULT 3.14\","
72 "\"field_name6\":\"STRING, NOT NULL, DEFAULT '3.1415'\","
73 "\"field_name7\":\"LONG, DEFAULT 100\","
74 "\"field_name8\":\"LONG, DEFAULT 100\","
75 "\"field_name9\":\"LONG, DEFAULT 100\","
76 "\"field_name10\":\"LONG, DEFAULT 100\""
77 "},"
78 "\"SCHEMA_INDEXES\":[\"$.field_name1\", \"$.field_name2\"]}";
79
80 const std::string SCHEMA_VALUE1 =
81 "{\"field_name1\":true,"
82 "\"field_name2\":false,"
83 "\"field_name3\":10,"
84 "\"field_name4\":20,"
85 "\"field_name5\":3.14,"
86 "\"field_name6\":\"3.1415\","
87 "\"field_name7\":100,"
88 "\"field_name8\":100,"
89 "\"field_name9\":100,"
90 "\"field_name10\":100}";
91
92 const std::string SCHEMA_VALUE2 =
93 "{\"field_name1\":false,"
94 "\"field_name2\":true,"
95 "\"field_name3\":100,"
96 "\"field_name4\":200,"
97 "\"field_name5\":3.14,"
98 "\"field_name6\":\"3.1415\","
99 "\"field_name7\":100,"
100 "\"field_name8\":100,"
101 "\"field_name9\":100,"
102 "\"field_name10\":100}";
103 }
104
105 class DistributedDBSingleVerP2PQuerySyncTest : public testing::Test {
106 public:
107 static void SetUpTestCase(void);
108 static void TearDownTestCase(void);
109 void SetUp();
110 void TearDown();
111 };
112
SetUpTestCase(void)113 void DistributedDBSingleVerP2PQuerySyncTest::SetUpTestCase(void)
114 {
115 /**
116 * @tc.setup: Init datadir and Virtual Communicator.
117 */
118 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
119 string dir = g_testDir + "/single_ver";
120 DIR* dirTmp = opendir(dir.c_str());
121 if (dirTmp == nullptr) {
122 OS::MakeDBDirectory(dir);
123 } else {
124 closedir(dirTmp);
125 }
126
127 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
128 ASSERT_TRUE(g_communicatorAggregator != nullptr);
129 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
130 }
131
TearDownTestCase(void)132 void DistributedDBSingleVerP2PQuerySyncTest::TearDownTestCase(void)
133 {
134 /**
135 * @tc.teardown: Release virtual Communicator and clear data dir.
136 */
137 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
138 LOGE("rm test db files error!");
139 }
140 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
141 }
142
SetUp(void)143 void DistributedDBSingleVerP2PQuerySyncTest::SetUp(void)
144 {
145 DistributedDBToolsUnitTest::PrintTestCaseInfo();
146 /**
147 * @tc.setup: create virtual device B and get a KvStoreNbDelegate as deviceA
148 */
149 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
150 ASSERT_TRUE(g_deviceB != nullptr);
151 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
152 ASSERT_TRUE(syncInterfaceB != nullptr);
153 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
154 }
155
TearDown(void)156 void DistributedDBSingleVerP2PQuerySyncTest::TearDown(void)
157 {
158 /**
159 * @tc.teardown: Release device A, B
160 */
161 if (g_kvDelegatePtr != nullptr) {
162 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
163 g_kvDelegatePtr = nullptr;
164 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
165 LOGD("delete kv store status %d", status);
166 ASSERT_TRUE(status == OK);
167 }
168 if (g_schemaKvDelegatePtr != nullptr) {
169 ASSERT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
170 g_schemaKvDelegatePtr = nullptr;
171 DBStatus status = g_schemaMgr.DeleteKvStore(SCHEMA_STORE_ID);
172 LOGD("delete kv store status %d", status);
173 ASSERT_TRUE(status == OK);
174 }
175 if (g_deviceB != nullptr) {
176 delete g_deviceB;
177 g_deviceB = nullptr;
178 }
179 PermissionCheckCallbackV2 nullCallback;
180 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
181 }
182
InitNormalDb()183 void InitNormalDb()
184 {
185 g_config.dataDir = g_testDir;
186 g_mgr.SetKvStoreConfig(g_config);
187 KvStoreNbDelegate::Option option;
188 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
189 ASSERT_TRUE(g_kvDelegateStatus == OK);
190 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
191 }
192
InitSchemaDb()193 void InitSchemaDb()
194 {
195 g_config.dataDir = g_testDir;
196 g_schemaMgr.SetKvStoreConfig(g_config);
197 KvStoreNbDelegate::Option option;
198 option.schema = SCHEMA_STRING;
199 g_schemaMgr.GetKvStore(SCHEMA_STORE_ID, option, g_schemaKvDelegateCallback);
200 ASSERT_TRUE(g_schemaKvDelegateStatus == OK);
201 ASSERT_TRUE(g_schemaKvDelegatePtr != nullptr);
202 }
203
204 /**
205 * @tc.name: Normal Sync 001
206 * @tc.desc: Test normal push sync for keyprefix data.
207 * @tc.type: FUNC
208 * @tc.require: AR000FN6G9
209 * @tc.author: xushaohua
210 */
211 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync001, TestSize.Level1)
212 {
213 InitNormalDb();
214 DBStatus status = OK;
215 std::vector<std::string> devices;
216 devices.push_back(g_deviceB->GetDeviceId());
217
218 /**
219 * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
220 */
221 Key key = {'1'};
222 Value value = {'1'};
223 const int dataSize = 10;
224 for (int i = 0; i < dataSize; i++) {
225 key.push_back(i);
226 value.push_back(i);
227 status = g_kvDelegatePtr->Put(key, value);
228 ASSERT_TRUE(status == OK);
229 key.pop_back();
230 value.pop_back();
231 }
232 Key key2 = {'2'};
233 Value value2 = {'2'};
234 status = g_kvDelegatePtr->Put(key2, value2);
235 ASSERT_TRUE(status == OK);
236
237 /**
238 * @tc.steps: step2. deviceA call query sync and wait
239 * @tc.expected: step2. sync should return OK.
240 */
241 Query query = Query::Select().PrefixKey(key);
242 std::map<std::string, DBStatus> result;
243 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
244 ASSERT_TRUE(status == OK);
245
246 /**
247 * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
248 */
249 ASSERT_TRUE(result.size() == devices.size());
250 for (const auto &pair : result) {
251 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
252 EXPECT_TRUE(pair.second == OK);
253 }
254 VirtualDataItem item;
255 for (int i = 0; i < dataSize; i++) {
256 key.push_back(i);
257 value.push_back(i);
258 g_deviceB->GetData(key, item);
259 EXPECT_TRUE(item.value == value);
260 key.pop_back();
261 value.pop_back();
262 }
263 EXPECT_TRUE(g_deviceB->GetData(key2, item) != E_OK);
264 }
265
266 /**
267 * @tc.name: Normal Sync 002
268 * @tc.desc: Test normal push sync for limit and offset.
269 * @tc.type: FUNC
270 * @tc.require: AR000FN6G9
271 * @tc.author: xushaohua
272 */
273 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync002, TestSize.Level1)
274 {
275 InitNormalDb();
276 DBStatus status = OK;
277 std::vector<std::string> devices;
278 devices.push_back(g_deviceB->GetDeviceId());
279
280 /**
281 * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
282 */
283 Key key = {'1'};
284 Value value = {'1'};
285 const int dataSize = 10;
286 for (int i = 0; i < dataSize; i++) {
287 key.push_back(i);
288 value.push_back(i);
289 status = g_kvDelegatePtr->Put(key, value);
290 ASSERT_TRUE(status == OK);
291 key.pop_back();
292 value.pop_back();
293 }
294
295 /**
296 * @tc.steps: step2. deviceA call sync and wait
297 * @tc.expected: step2. sync should return OK.
298 */
299 const int limit = 5;
300 const int offset = 4;
301 Query query = Query::Select().PrefixKey(key).Limit(limit, offset);
302 std::map<std::string, DBStatus> result;
303 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
304 ASSERT_TRUE(status == OK);
305
306 /**
307 * @tc.expected: step3. onComplete should be called, DeviceB have {k4,v4} {k8, v8}
308 */
309 ASSERT_TRUE(result.size() == devices.size());
310 for (const auto &pair : result) {
311 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
312 EXPECT_TRUE(pair.second == OK);
313 }
314
315 VirtualDataItem item;
316 for (int i = limit - 1; i < limit + offset; i++) {
317 key.push_back(i);
318 value.push_back(i);
319 g_deviceB->GetData(key, item);
320 EXPECT_TRUE(item.value == value);
321 key.pop_back();
322 value.pop_back();
323 }
324 }
325
326 /**
327 * @tc.name: Normal Sync 001
328 * @tc.desc: Test normal push_and_pull sync for keyprefix data.
329 * @tc.type: FUNC
330 * @tc.require: AR000FN6G9
331 * @tc.author: zhuwentao
332 */
333 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync003, TestSize.Level1)
334 {
335 InitNormalDb();
336 DBStatus status = OK;
337 std::vector<std::string> devices;
338 devices.push_back(g_deviceB->GetDeviceId());
339
340 /**
341 * @tc.steps: step1. deviceA put {k, v}, {b, v}
342 */
343 Key key = {'1'};
344 Value value = {'1'};
345 const int dataSize = 10;
346 status = g_kvDelegatePtr->Put(key, value);
347 ASSERT_TRUE(status == OK);
348 Key key2 = {'2'};
349 Value value2 = {'2'};
350 status = g_kvDelegatePtr->Put(key2, value2);
351 ASSERT_TRUE(status == OK);
352
353 /**
354 * @tc.steps: step2. deviceB put {b0, v0} - {b9, v9}, {c, v}
355 */
356 for (int i = 0; i < dataSize; i++) {
357 key2.push_back(i);
358 value2.push_back(i);
359 g_deviceB->PutData(key2, value2, 10 + i, 0);
360 key2.pop_back();
361 value2.pop_back();
362 }
363 Key key3 = {'3'};
364 Value value3 = {'3'};
365 g_deviceB->PutData(key3, value3, 20, 0);
366
367 /**
368 * @tc.steps: step2. deviceA call query sync and wait
369 * @tc.expected: step2. sync should return OK.
370 */
371 Query query = Query::Select().PrefixKey(key2);
372 std::map<std::string, DBStatus> result;
373 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
374 ASSERT_TRUE(status == OK);
375
376 /**
377 * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}, DeviceB have {b, v}
378 */
379 ASSERT_TRUE(result.size() == devices.size());
380 for (const auto &pair : result) {
381 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
382 EXPECT_TRUE(pair.second == OK);
383 }
384 VirtualDataItem item;
385 Value tmpValue;
386 for (int i = 0; i < dataSize; i++) {
387 key2.push_back(i);
388 value2.push_back(i);
389 g_kvDelegatePtr->Get(key2, tmpValue);
390 EXPECT_TRUE(tmpValue == value2);
391 key2.pop_back();
392 value2.pop_back();
393 }
394 EXPECT_TRUE(g_deviceB->GetData(key, item) != E_OK);
395 EXPECT_TRUE(g_deviceB->GetData(key2, item) == E_OK);
396 g_kvDelegatePtr->Get(key3, tmpValue);
397 EXPECT_TRUE(tmpValue != value3);
398 }
399
400 /**
401 * @tc.name: Normal Sync 001
402 * @tc.desc: Test normal pull sync for keyprefix data.
403 * @tc.type: FUNC
404 * @tc.require: AR000FN6G9
405 * @tc.author: zhuwentao
406 */
407 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync004, TestSize.Level1)
408 {
409 InitNormalDb();
410 DBStatus status = OK;
411 std::vector<std::string> devices;
412 devices.push_back(g_deviceB->GetDeviceId());
413 /**
414 * @tc.steps: step1. deviceB put {k1, v1} - {k9, k9}, {b0, v0} - {b9, v9}
415 */
416 Key key = {'1'};
417 Value value = {'1'};
418 const int dataSize = 10;
419 Key key2 = {'2'};
420 Value value2 = {'2'};
421 vector<std::pair<Key, Value>> key1Vec;
422 vector<std::pair<Key, Value>> key2Vec;
423 for (int i = 0; i < dataSize; i++) {
424 Key tmpKey(key);
425 Value tmpValue(value);
426 tmpKey.push_back(i);
427 tmpValue.push_back(i);
428 key1Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
429 }
430 for (int i = 0; i < dataSize; i++) {
431 Key tmpKey(key2);
432 Value tmpValue(value2);
433 tmpKey.push_back(i);
434 tmpValue.push_back(i);
435 key2Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
436 }
437 for (int i = 0; i < dataSize; i++) {
438 g_deviceB->PutData(key2Vec[i].first, key2Vec[i].second, 20 + i, 0);
439 g_deviceB->PutData(key1Vec[i].first, key1Vec[i].second, 10 + i, 0);
440 }
441
442 /**
443 * @tc.steps: step2. deviceA call query sync and wait
444 * @tc.expected: step2. sync should return OK.
445 */
446 Query query = Query::Select().PrefixKey(key2);
447 std::map<std::string, DBStatus> result;
448 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
449 ASSERT_TRUE(status == OK);
450
451 /**
452 * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}
453 */
454 ASSERT_TRUE(result.size() == devices.size());
455 for (const auto &pair : result) {
456 EXPECT_TRUE(pair.second == OK);
457 }
458 VirtualDataItem item;
459 Value tmpValue;
460 for (int i = 0; i < dataSize; i++) {
461 g_kvDelegatePtr->Get(key2Vec[i].first, tmpValue);
462 EXPECT_TRUE(tmpValue == key2Vec[i].second);
463 g_kvDelegatePtr->Get(key1Vec[i].first, tmpValue);
464 EXPECT_TRUE(tmpValue != key1Vec[i].second);
465 }
466 }
467
468 /**
469 * @tc.name: NormalSync005
470 * @tc.desc: Test normal push sync for inkeys query.
471 * @tc.type: FUNC
472 * @tc.require: AR000GOHO7
473 * @tc.author: lidongwei
474 */
475 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync005, TestSize.Level1)
476 {
477 InitNormalDb();
478 std::vector<std::string> devices;
479 devices.push_back(g_deviceB->GetDeviceId());
480
481 /**
482 * @tc.steps: step1. deviceA put K1-K5
483 */
484 ASSERT_EQ(g_kvDelegatePtr->PutBatch(
485 {{KEY_1, VALUE_1}, {KEY_2, VALUE_2}, {KEY_3, VALUE_3}, {KEY_4, VALUE_4}, {KEY_5, VALUE_5}}), OK);
486
487 /**
488 * @tc.steps: step2. deviceA sync K2,K4 and wait
489 * @tc.expected: step2. sync should return OK.
490 */
491 Query query = Query::Select().InKeys({KEY_2, KEY_4});
492 std::map<std::string, DBStatus> result;
493 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OK);
494
495 /**
496 * @tc.expected: step3. onComplete should be called.
497 */
498 ASSERT_EQ(result.size(), devices.size());
499 for (const auto &pair : result) {
500 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
501 EXPECT_EQ(pair.second, OK);
502 }
503
504 /**
505 * @tc.steps: step4. deviceB have K2K4 and have no K1K3K5.
506 * @tc.expected: step4. sync should return OK.
507 */
508 VirtualDataItem item;
509 EXPECT_EQ(g_deviceB->GetData(KEY_2, item), E_OK);
510 EXPECT_EQ(item.value, VALUE_2);
511 EXPECT_EQ(g_deviceB->GetData(KEY_4, item), E_OK);
512 EXPECT_EQ(item.value, VALUE_4);
513 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
514 EXPECT_EQ(g_deviceB->GetData(KEY_3, item), -E_NOT_FOUND);
515 EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
516
517 /**
518 * @tc.steps: step5. deviceA sync with invalid inkeys query
519 * @tc.expected: step5. sync failed and the rc is right.
520 */
521 query = Query::Select().InKeys({});
522 result.clear();
523 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
524
525 std::set<Key> keys;
526 for (uint8_t i = 0; i < DBConstant::MAX_BATCH_SIZE + 1; i++) {
527 Key key = { i };
528 keys.emplace(key);
529 }
530 query = Query::Select().InKeys(keys);
531 result.clear();
532 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OVER_MAX_LIMITS);
533
534 query = Query::Select().InKeys({{}});
535 result.clear();
536 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
537 }
538
539 /**
540 * @tc.name: NormalSync006
541 * @tc.desc: Test normal push sync with query by 32 devices;
542 * @tc.type: FUNC
543 * @tc.require:
544 * @tc.author: zhuwentao
545 */
546 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync006, TestSize.Level1)
547 {
548 /**
549 * @tc.steps: step1. init db and 32 devices
550 */
551 InitNormalDb();
552 uint32_t syncDevCount = 1u;
553 std::vector<KvVirtualDevice *> virtualDeviceVec(syncDevCount, nullptr);
554 const std::string device = "deviceTmp_";
555 std::vector<std::string> devices;
556 bool isError = false;
557 for (uint32_t i = 0; i < syncDevCount; i++) {
558 std::string tmpDev = device + std::to_string(i);
559 virtualDeviceVec[i] = new (std::nothrow) KvVirtualDevice(tmpDev);
560 if (virtualDeviceVec[i] == nullptr) {
561 isError = true;
562 break;
563 }
564 VirtualSingleVerSyncDBInterface *tmpSyncInterface = new (std::nothrow) VirtualSingleVerSyncDBInterface();
565 if (tmpSyncInterface == nullptr) {
566 isError = true;
567 break;
568 }
569 ASSERT_EQ(virtualDeviceVec[i]->Initialize(g_communicatorAggregator, tmpSyncInterface), E_OK);
570 devices.push_back(virtualDeviceVec[i]->GetDeviceId());
571 }
572 if (isError) {
573 for (uint32_t i = 0; i < syncDevCount; i++) {
574 if (virtualDeviceVec[i] != nullptr) {
575 delete virtualDeviceVec[i];
576 virtualDeviceVec[i] = nullptr;
577 }
578 }
579 ASSERT_TRUE(false);
580 }
581 /**
582 * @tc.steps: step2. deviceA put {k0, v0}
583 */
584 Key key = {'1'};
585 Value value = {'1'};
586 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
587 /**
588 * @tc.steps: step3. deviceA call query sync and wait
589 * @tc.expected: step3. sync should return OK.
590 */
591 Query query = Query::Select().PrefixKey(key);
592 std::map<std::string, DBStatus> result;
593 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
594
595 /**
596 * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
597 */
598 ASSERT_TRUE(result.size() == devices.size());
599 for (const auto &pair : result) {
600 EXPECT_TRUE(pair.second == OK);
601 }
602 VirtualDataItem item;
603 for (uint32_t i = 0; i < syncDevCount; i++) {
604 EXPECT_TRUE(virtualDeviceVec[i]->GetData(key, item) == E_OK);
605 EXPECT_EQ(item.value, value);
606 delete virtualDeviceVec[i];
607 virtualDeviceVec[i] = nullptr;
608 }
609 }
610
611 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryRequestPacketTest001, TestSize.Level1)
612 {
613 /**
614 * @tc.steps: step1. prepare a QuerySyncRequestPacket.
615 */
616 auto packet = new (std::nothrow) DataRequestPacket;
617 ASSERT_TRUE(packet != nullptr);
618 auto kvEntry = new (std::nothrow) GenericSingleVerKvEntry;
619 ASSERT_TRUE(kvEntry != nullptr);
620 kvEntry->SetTimestamp(1);
621 SyncEntry syncData {.entries = {kvEntry}};
622 #ifndef OMIT_ZLIB
623 ASSERT_TRUE(GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
624 {CompressAlgorithm::ZLIB, SOFTWARE_VERSION_CURRENT}) == E_OK);
625 packet->SetCompressAlgo(CompressAlgorithm::ZLIB);
626 packet->SetFlag(4); // set IS_COMPRESS_DATA flag true
627 #endif
628 packet->SetBasicInfo(-E_NOT_SUPPORT, SOFTWARE_VERSION_CURRENT, SyncModeType::QUERY_PUSH_PULL);
629 packet->SetData(syncData.entries);
630 packet->SetCompressData(syncData.compressedEntries);
631 packet->SetEndWaterMark(INT8_MAX);
632 packet->SetWaterMark(INT16_MAX, INT32_MAX, INT64_MAX);
633 QuerySyncObject syncQuery(Query::Select().PrefixKey({'2'}));
634 packet->SetQuery(syncQuery);
635 packet->SetQueryId(syncQuery.GetIdentify());
636 packet->SetReserved(std::vector<uint64_t> {INT8_MAX});
637
638 /**
639 * @tc.steps: step2. put the QuerySyncRequestPacket into a message.
640 */
641 Message msg;
642 msg.SetExternalObject(packet);
643 msg.SetMessageId(QUERY_SYNC_MESSAGE);
644 msg.SetMessageType(TYPE_REQUEST);
645
646 /**
647 * @tc.steps: step3. Serialization the message to a buffer.
648 */
649 int len = SingleVerSerializeManager::CalculateLen(&msg);
650 vector<uint8_t> buffer(len);
651 ASSERT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), E_OK);
652
653 /**
654 * @tc.steps: step4. DeSerialization the buffer to a message.
655 */
656 Message outMsg(QUERY_SYNC_MESSAGE);
657 outMsg.SetMessageType(TYPE_REQUEST);
658 ASSERT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &outMsg), E_OK);
659
660 /**
661 * @tc.steps: step5. checkout the outMsg.
662 * @tc.expected: step5. outMsg equal the the in msg
663 */
664 auto outPacket = outMsg.GetObject<DataRequestPacket>();
665 EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
666 EXPECT_EQ(outPacket->GetMode(), SyncModeType::QUERY_PUSH_PULL);
667 EXPECT_EQ(outPacket->GetEndWaterMark(), static_cast<uint64_t>(INT8_MAX));
668 EXPECT_EQ(outPacket->GetLocalWaterMark(), static_cast<uint64_t>(INT16_MAX));
669 EXPECT_EQ(outPacket->GetPeerWaterMark(), static_cast<uint64_t>(INT32_MAX));
670 EXPECT_EQ(outPacket->GetDeletedWaterMark(), static_cast<uint64_t>(INT64_MAX));
671 #ifndef OMIT_ZLIB
672 EXPECT_EQ(outPacket->GetFlag(), static_cast<uint32_t>(4)); // check IS_COMPRESS_DATA flag true
673 #endif
674 EXPECT_EQ(outPacket->GetQueryId(), syncQuery.GetIdentify());
675 EXPECT_EQ(outPacket->GetReserved(), std::vector<uint64_t> {INT8_MAX});
676 EXPECT_EQ(outPacket->GetSendCode(), -E_NOT_SUPPORT);
677 EXPECT_EQ(outPacket->GetData()[0]->GetTimestamp(), 1u);
678 }
679
680 /**
681 * @tc.name: QueryRequestPacketTest002
682 * @tc.desc: Test exception branch of serialization.
683 * @tc.type: FUNC
684 * @tc.require:
685 * @tc.author: zhangshijie
686 */
687 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, SerializationManager001, TestSize.Level1)
688 {
689 /**
690 * @tc.steps: step1. call SingleVerSerializeManager::Serialization with buffer = nullptr or msg = nullptr
691 * @tc.expected:step1 return -E_MESSAGE_ID_ERROR
692 */
693 Message msg;
694 msg.SetMessageType(TYPE_INVALID);
695 vector<uint8_t> buffer(10); // 10 is test buffer len
696 EXPECT_EQ(SingleVerSerializeManager::Serialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
697 EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
698
699 /**
700 * @tc.steps: step2. call SingleVerSerializeManager::Serialization with invalid type message
701 * @tc.expected:step2 return -E_MESSAGE_ID_ERROR
702 */
703 EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
704
705 /**
706 * @tc.steps: step3. call SingleVerSerializeManager::DeSerialization with buffer = nullptr or msg = nullptr
707 * @tc.expected:step3 return -E_MESSAGE_ID_ERROR
708 */
709 EXPECT_EQ(SingleVerSerializeManager::DeSerialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
710 EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
711
712 /**
713 * @tc.steps: step4. call SingleVerSerializeManager::DeSerialization with invalid type message
714 * @tc.expected:step4 return -E_MESSAGE_ID_ERROR
715 */
716 EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
717 }
718
719 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryAckPacketTest001, TestSize.Level1)
720 {
721 /**
722 * @tc.steps: step1. prepare a QuerySyncAckPacket.
723 */
724 DataAckPacket packet;
725 packet.SetVersion(SOFTWARE_VERSION_CURRENT);
726 packet.SetData(INT64_MAX);
727 packet.SetRecvCode(-E_NOT_SUPPORT);
728 std::vector<uint64_t> reserved = {INT8_MAX};
729 packet.SetReserved(reserved);
730
731 /**
732 * @tc.steps: step2. put the QuerySyncAckPacket into a message.
733 */
734 Message msg;
735 msg.SetCopiedObject(packet);
736 msg.SetMessageId(QUERY_SYNC_MESSAGE);
737 msg.SetMessageType(TYPE_RESPONSE);
738
739 /**
740 * @tc.steps: step3. Serialization the message to a buffer.
741 */
742 int len = SingleVerSerializeManager::CalculateLen(&msg);
743 LOGE("test leng = %d", len);
744 uint8_t *buffer = new (nothrow) uint8_t[len];
745 ASSERT_TRUE(buffer != nullptr);
746 int errCode = SingleVerSerializeManager::Serialization(buffer, len, &msg);
747 ASSERT_EQ(errCode, E_OK);
748
749 /**
750 * @tc.steps: step4. DeSerialization the buffer to a message.
751 */
752 Message outMsg;
753 outMsg.SetMessageId(QUERY_SYNC_MESSAGE);
754 outMsg.SetMessageType(TYPE_RESPONSE);
755 errCode = SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg);
756 ASSERT_EQ(errCode, E_OK);
757
758 /**
759 * @tc.steps: step5. checkout the outMsg.
760 * @tc.expected: step5. outMsg equal the the in msg
761 */
762 auto outPacket = outMsg.GetObject<DataAckPacket>();
763 EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
764 EXPECT_EQ(outPacket->GetData(), static_cast<uint64_t>(INT64_MAX));
765 std::vector<uint64_t> reserved2 = {INT8_MAX};
766 EXPECT_EQ(outPacket->GetReserved(), reserved2);
767 EXPECT_EQ(outPacket->GetRecvCode(), -E_NOT_SUPPORT);
768 delete[] buffer;
769 }
770
771 /**
772 * @tc.name: GetQueryWaterMark 001
773 * @tc.desc: Test metaData save and get queryWaterMark.
774 * @tc.type: FUNC
775 * @tc.require: AR000FN6G9
776 * @tc.author: zhangqiquan
777 */
778 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark001, TestSize.Level1)
779 {
780 VirtualSingleVerSyncDBInterface storage;
781 Metadata meta;
782
783 /**
784 * @tc.steps: step1. initialize meta with storage
785 * @tc.expected: step1. E_OK
786 */
787 int errCode = meta.Initialize(&storage);
788 ASSERT_EQ(errCode, E_OK);
789
790 /**
791 * @tc.steps: step2. save receive and send watermark
792 * @tc.expected: step2. E_OK
793 */
794 WaterMark w1 = 1;
795 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
796 EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", w1), E_OK);
797
798 /**
799 * @tc.steps: step3. get receive and send watermark
800 * @tc.expected: step3. E_OK and get the latest value
801 */
802 WaterMark w = 0;
803 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
804 EXPECT_EQ(w1, w);
805 EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
806 EXPECT_EQ(w1, w);
807
808 /**
809 * @tc.steps: step4. set peer and local watermark
810 * @tc.expected: step4. E_OK
811 */
812 WaterMark w2 = 2;
813 EXPECT_EQ(meta.SaveLocalWaterMark("D1", w2), E_OK);
814 EXPECT_EQ(meta.SavePeerWaterMark("D1", w2, true), E_OK);
815
816 /**
817 * @tc.steps: step5. get receive and send watermark
818 * @tc.expected: step5. E_OK and get the w1
819 */
820 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
821 EXPECT_EQ(w2, w);
822 EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
823 EXPECT_EQ(w2, w);
824
825 /**
826 * @tc.steps: step6. set peer and local watermark
827 * @tc.expected: step6. E_OK
828 */
829 WaterMark w3 = 3;
830 EXPECT_EQ(meta.SaveLocalWaterMark("D2", w3), E_OK);
831 EXPECT_EQ(meta.SavePeerWaterMark("D2", w3, true), E_OK);
832
833 /**
834 * @tc.steps: step7. get receive and send watermark
835 * @tc.expected: step7. E_OK and get the w3
836 */
837 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q2", "D2", w), E_OK);
838 EXPECT_EQ(w3, w);
839 EXPECT_EQ(meta.GetSendQueryWaterMark("Q2", "D2", w), E_OK);
840 EXPECT_EQ(w3, w);
841
842 /**
843 * @tc.steps: step8. get not exit receive and send watermark
844 * @tc.expected: step8. E_OK and get the 0
845 */
846 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q3", "D3", w), E_OK);
847 EXPECT_EQ(w, 0u);
848 EXPECT_EQ(meta.GetSendQueryWaterMark("Q3", "D3", w), E_OK);
849 EXPECT_EQ(w, 0u);
850 }
851
852 /**
853 * @tc.name: GetQueryWaterMark 002
854 * @tc.desc: Test metaData save and get queryWaterMark after push or pull mode.
855 * @tc.type: FUNC
856 * @tc.require: AR000FN6G9
857 * @tc.author: zhangqiquan
858 */
859 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark002, TestSize.Level1)
860 {
861 VirtualSingleVerSyncDBInterface storage;
862 Metadata meta;
863
864 /**
865 * @tc.steps: step1. initialize meta with storage
866 * @tc.expected: step1. E_OK
867 */
868 int errCode = meta.Initialize(&storage);
869 ASSERT_EQ(errCode, E_OK);
870
871 /**
872 * @tc.steps: step2. set peer and local watermark
873 * @tc.expected: step2. E_OK
874 */
875 WaterMark w1 = 2;
876 EXPECT_EQ(meta.SaveLocalWaterMark("D1", w1), E_OK);
877 EXPECT_EQ(meta.SavePeerWaterMark("D1", w1, true), E_OK);
878
879 /**
880 * @tc.steps: step2. save receive and send watermark
881 * @tc.expected: step2. E_OK
882 */
883 WaterMark w2 = 1;
884 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
885 EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", w2), E_OK);
886
887 /**
888 * @tc.steps: step3. get receive and send watermark
889 * @tc.expected: step3. E_OK and get the bigger value
890 */
891 WaterMark w = 0;
892 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
893 EXPECT_EQ(w1, w);
894 EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
895 EXPECT_EQ(w1, w);
896 }
897
898 /**
899 * @tc.name: ClearQueryWaterMark 001
900 * @tc.desc: Test metaData clear watermark function.
901 * @tc.type: FUNC
902 * @tc.require: AR000FN6G9
903 * @tc.author: zhangqiquan
904 */
905 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark001, TestSize.Level1)
906 {
907 VirtualSingleVerSyncDBInterface storage;
908 Metadata meta;
909
910 /**
911 * @tc.steps: step1. initialize meta with storage
912 * @tc.expected: step1. E_OK
913 */
914 int errCode = meta.Initialize(&storage);
915 ASSERT_EQ(errCode, E_OK);
916
917 /**
918 * @tc.steps: step2. save receive watermark
919 * @tc.expected: step2. E_OK
920 */
921 WaterMark w1 = 1;
922 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
923
924 /**
925 * @tc.steps: step3. erase peer watermark
926 * @tc.expected: step3. E_OK
927 */
928 EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
929
930 /**
931 * @tc.steps: step4. get receive watermark
932 * @tc.expected: step4. E_OK receive watermark is zero
933 */
934 WaterMark w2 = -1;
935 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
936 EXPECT_EQ(w2, 0u);
937
938 /**
939 * @tc.steps: step5. set peer watermark
940 * @tc.expected: step5. E_OK
941 */
942 WaterMark w3 = 2;
943 EXPECT_EQ(meta.SavePeerWaterMark("D1", w3, true), E_OK);
944
945 /**
946 * @tc.steps: step6. get receive watermark
947 * @tc.expected: step6. E_OK receive watermark is peer watermark
948 */
949 WaterMark w4 = -1;
950 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w4), E_OK);
951 EXPECT_EQ(w4, w3);
952 }
953
954 /**
955 * @tc.name: ClearQueryWaterMark 002
956 * @tc.desc: Test metaData clear watermark function.
957 * @tc.type: FUNC
958 * @tc.require: AR000FN6G9
959 * @tc.author: zhangqiquan
960 */
961 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark002, TestSize.Level1)
962 {
963 VirtualSingleVerSyncDBInterface storage;
964 Metadata meta;
965
966 /**
967 * @tc.steps: step1. initialize meta with storage
968 * @tc.expected: step1. E_OK
969 */
970 int errCode = meta.Initialize(&storage);
971 ASSERT_EQ(errCode, E_OK);
972
973 /**
974 * @tc.steps: step2. save receive watermark
975 * @tc.expected: step2. E_OK
976 */
977 WaterMark w1 = 1;
978 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
979 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q2", "D1", w1), E_OK);
980 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D2", w1), E_OK);
981
982 /**
983 * @tc.steps: step3. erase peer watermark, make sure data remove in db
984 * @tc.expected: step3. E_OK
985 */
986 Metadata anotherMeta;
987 ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
988 EXPECT_EQ(anotherMeta.EraseDeviceWaterMark("D1", true), E_OK);
989
990 /**
991 * @tc.steps: step4. get receive watermark
992 * @tc.expected: step4. E_OK receive watermark is zero
993 */
994 WaterMark w2 = -1;
995 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
996 EXPECT_EQ(w2, 0u);
997 w2 = -1;
998 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q2", "D1", w2), E_OK);
999 EXPECT_EQ(w2, 0u);
1000 w2 = -1;
1001 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D2", w2), E_OK);
1002 EXPECT_EQ(w2, w1);
1003 }
1004
1005 /**
1006 * @tc.name: GetQueryLastTimestamp001
1007 * @tc.desc: Test function of GetQueryLastTimestamp.
1008 * @tc.type: FUNC
1009 * @tc.require: AR000FN6G9
1010 * @tc.author: zhangshijie
1011 */
1012 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryLastTimestamp001, TestSize.Level1)
1013 {
1014 /**
1015 * @tc.steps: step1. initialize meta with nullptr
1016 * @tc.expected: step1. return -E_INVALID_DB
1017 */
1018 Metadata meta;
1019 EXPECT_EQ(meta.Initialize(nullptr), -E_INVALID_DB);
1020
1021 /**
1022 * @tc.steps: step2. initialize meta with storage
1023 * @tc.expected: step2. E_OK
1024 */
1025 VirtualSingleVerSyncDBInterface storage;
1026 int errCode = meta.Initialize(&storage);
1027 ASSERT_EQ(errCode, E_OK);
1028
1029 /**
1030 * @tc.steps: step3. call GetQueryLastTimestamp with a non-exists device
1031 * @tc.expected: step3. return INT64_MAX
1032 */
1033 EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), static_cast<uint64_t>(INT64_MAX));
1034
1035 /**
1036 * @tc.steps: step4. call GetQueryLastTimestamp with device D1 again
1037 * @tc.expected: step4. return 0
1038 */
1039 EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), 0u);
1040
1041 /**
1042 * @tc.steps: step5. call GetQueryLastTimestamp with device D1 and Q2
1043 * @tc.expected: step5. return INT64_MAX
1044 */
1045 EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q2"), static_cast<uint64_t>(INT64_MAX));
1046 }
1047
1048 /**
1049 * @tc.name: MetaDataExceptionBranch001
1050 * @tc.desc: Test execption branch of meata data.
1051 * @tc.type: FUNC
1052 * @tc.require: AR000FN6G9
1053 * @tc.author: zhangshijie
1054 */
1055 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, MetaDataExceptionBranch001, TestSize.Level1)
1056 {
1057 /**
1058 * @tc.steps: step1. call GetRemoveDataMark with a device not in map
1059 * @tc.expected: step1. out value = 0
1060 */
1061 Metadata meta;
1062 uint64_t val = 99; // 99 is the initial value of outValue
1063 uint64_t outValue = val;
1064 meta.GetRemoveDataMark("D1", outValue);
1065 EXPECT_EQ(outValue, 0u);
1066
1067 /**
1068 * @tc.steps: step2. reset outValue, call GetDbCreateTime with a device not in map
1069 * @tc.expected: step2. out value = 0
1070 */
1071 outValue = val;
1072 meta.GetDbCreateTime("D1", outValue);
1073 EXPECT_EQ(outValue, 0u);
1074
1075 /**
1076 * @tc.steps: step3. call ResetMetaDataAfterRemoveData with a device not in map
1077 * @tc.expected: step3. return -E_NOT_FOUND
1078 */
1079 EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1"), -E_NOT_FOUND);
1080 }
1081
1082 /**
1083 * @tc.name: GetDeleteKeyWaterMark 001
1084 * @tc.desc: Test metaData save and get deleteWaterMark.
1085 * @tc.type: FUNC
1086 * @tc.require: AR000FN6G9
1087 * @tc.author: zhangqiquan
1088 */
1089 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark001, TestSize.Level1)
1090 {
1091 VirtualSingleVerSyncDBInterface storage;
1092 Metadata meta;
1093
1094 /**
1095 * @tc.steps: step1. initialize meta with storage
1096 * @tc.expected: step1. E_OK
1097 */
1098 int errCode = meta.Initialize(&storage);
1099 ASSERT_EQ(errCode, E_OK);
1100
1101 /**
1102 * @tc.steps: step2. save receive and send watermark
1103 * @tc.expected: step2. E_OK
1104 */
1105 WaterMark w1 = 1;
1106 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w1), E_OK);
1107 EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", w1), E_OK);
1108
1109 /**
1110 * @tc.steps: step3. get receive and send watermark
1111 * @tc.expected: step3. E_OK and get the latest value
1112 */
1113 WaterMark w = 0;
1114 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1115 EXPECT_EQ(w1, w);
1116 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1117 EXPECT_EQ(w1, w);
1118
1119 /**
1120 * @tc.steps: step4. set peer and local watermark
1121 * @tc.expected: step4. E_OK
1122 */
1123 WaterMark w2 = 2;
1124 EXPECT_EQ(meta.SaveLocalWaterMark("D1", w2), E_OK);
1125 EXPECT_EQ(meta.SavePeerWaterMark("D1", w2, true), E_OK);
1126
1127 /**
1128 * @tc.steps: step5. get receive and send watermark
1129 * @tc.expected: step5. E_OK and get the w1
1130 */
1131 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1132 EXPECT_EQ(w2, w);
1133 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1134 EXPECT_EQ(w2, w);
1135
1136 /**
1137 * @tc.steps: step6. set peer and local watermark
1138 * @tc.expected: step6. E_OK
1139 */
1140 WaterMark w3 = 3;
1141 EXPECT_EQ(meta.SaveLocalWaterMark("D2", w3), E_OK);
1142 EXPECT_EQ(meta.SavePeerWaterMark("D2", w3, true), E_OK);
1143
1144 /**
1145 * @tc.steps: step7. get receive and send watermark
1146 * @tc.expected: step7. E_OK and get the w3
1147 */
1148 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D2", w), E_OK);
1149 EXPECT_EQ(w3, w);
1150 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D2", w), E_OK);
1151 EXPECT_EQ(w3, w);
1152
1153 /**
1154 * @tc.steps: step8. get not exit receive and send watermark
1155 * @tc.expected: step8. E_OK and get the 0
1156 */
1157 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D3", w), E_OK);
1158 EXPECT_EQ(w, 0u);
1159 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D3", w), E_OK);
1160 EXPECT_EQ(w, 0u);
1161 }
1162
1163 /**
1164 * @tc.name: GetDeleteKeyWaterMark 002
1165 * @tc.desc: Test metaData save and get deleteWaterMark after push or pull mode.
1166 * @tc.type: FUNC
1167 * @tc.require: AR000FN6G9
1168 * @tc.author: zhangqiquan
1169 */
1170 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark002, TestSize.Level1)
1171 {
1172 VirtualSingleVerSyncDBInterface storage;
1173 Metadata meta;
1174
1175 /**
1176 * @tc.steps: step1. initialize meta with storage
1177 * @tc.expected: step1. E_OK
1178 */
1179 int errCode = meta.Initialize(&storage);
1180 ASSERT_EQ(errCode, E_OK);
1181
1182 /**
1183 * @tc.steps: step2. set peer and local watermark
1184 * @tc.expected: step2. E_OK
1185 */
1186 WaterMark w1 = 3;
1187 EXPECT_EQ(meta.SaveLocalWaterMark("D1", w1), E_OK);
1188 EXPECT_EQ(meta.SavePeerWaterMark("D1", w1, true), E_OK);
1189
1190 /**
1191 * @tc.steps: step2. save receive and send watermark
1192 * @tc.expected: step2. E_OK
1193 */
1194 WaterMark w2 = 1;
1195 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w2), E_OK);
1196 EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", w2), E_OK);
1197
1198 /**
1199 * @tc.steps: step3. get receive and send watermark
1200 * @tc.expected: step3. E_OK and get the bigger value
1201 */
1202 WaterMark w = 0;
1203 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1204 EXPECT_EQ(w1, w);
1205 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1206 EXPECT_EQ(w1, w);
1207 }
1208
1209 /**
1210 * @tc.name: ClearDeleteKeyWaterMark 001
1211 * @tc.desc: Test metaData clear watermark function.
1212 * @tc.type: FUNC
1213 * @tc.require: AR000FN6G9
1214 * @tc.author: zhangqiquan
1215 */
1216 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearDeleteKeyWaterMark001, TestSize.Level1)
1217 {
1218 VirtualSingleVerSyncDBInterface storage;
1219 Metadata meta;
1220
1221 /**
1222 * @tc.steps: step1. initialize meta with storage
1223 * @tc.expected: step1. E_OK
1224 */
1225 int errCode = meta.Initialize(&storage);
1226 ASSERT_EQ(errCode, E_OK);
1227
1228 /**
1229 * @tc.steps: step2. save receive watermark
1230 * @tc.expected: step2. E_OK
1231 */
1232 WaterMark w1 = 1;
1233 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w1), E_OK);
1234
1235 /**
1236 * @tc.steps: step3. erase peer watermark
1237 * @tc.expected: step3. E_OK
1238 */
1239 EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
1240
1241 /**
1242 * @tc.steps: step4. get receive watermark
1243 * @tc.expected: step4. E_OK receive watermark is zero
1244 */
1245 WaterMark w2 = -1;
1246 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w2), E_OK);
1247 EXPECT_EQ(w2, 0u);
1248
1249 /**
1250 * @tc.steps: step5. set peer watermark
1251 * @tc.expected: step5. E_OK
1252 */
1253 WaterMark w3 = 2;
1254 EXPECT_EQ(meta.SavePeerWaterMark("D1", w3, true), E_OK);
1255
1256 /**
1257 * @tc.steps: step6. get receive watermark
1258 * @tc.expected: step6. E_OK receive watermark is peer watermark
1259 */
1260 WaterMark w4 = -1;
1261 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w4), E_OK);
1262 EXPECT_EQ(w4, w3);
1263 }
1264
1265 /**
1266 * @tc.name: VerifyCacheAndDb 001
1267 * @tc.desc: Test metaData watermark cache and db are consistent and correct.
1268 * @tc.type: FUNC
1269 * @tc.require: AR000FN6G9
1270 * @tc.author: zhangqiquan
1271 */
1272 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataQuerySync001, TestSize.Level1)
1273 {
1274 Metadata meta;
1275 VirtualSingleVerSyncDBInterface storage;
1276
1277 /**
1278 * @tc.steps: step1. initialize meta with storage
1279 * @tc.expected: step1. E_OK
1280 */
1281 int errCode = meta.Initialize(&storage);
1282 ASSERT_EQ(errCode, E_OK);
1283
1284 const std::string deviceId = "D1";
1285 const std::string queryId = "Q1";
1286
1287 /**
1288 * @tc.steps: step2. save deleteSync watermark
1289 * @tc.expected: step2. E_OK
1290 */
1291 WaterMark deleteWaterMark = 1;
1292 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark(deviceId, deleteWaterMark), E_OK);
1293 EXPECT_EQ(meta.SetSendDeleteSyncWaterMark(deviceId, deleteWaterMark), E_OK);
1294
1295 /**
1296 * @tc.steps: step3. save querySync watermark
1297 * @tc.expected: step2. E_OK
1298 */
1299 WaterMark queryWaterMark = 2;
1300 EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, queryWaterMark), E_OK);
1301 EXPECT_EQ(meta.SetSendQueryWaterMark(queryId, deviceId, queryWaterMark), E_OK);
1302
1303 /**
1304 * @tc.steps: step4. initialize meta with storage
1305 * @tc.expected: step4. E_OK
1306 */
1307 Metadata anotherMeta;
1308 ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1309
1310 /**
1311 * @tc.steps: step5. verify delete sync data
1312 * @tc.expected: step5. E_OK and waterMark equal to deleteWaterMark
1313 */
1314 WaterMark waterMark;
1315 EXPECT_EQ(anotherMeta.GetRecvDeleteSyncWaterMark(deviceId, waterMark), E_OK);
1316 EXPECT_EQ(waterMark, deleteWaterMark);
1317 EXPECT_EQ(anotherMeta.GetSendDeleteSyncWaterMark(deviceId, waterMark), E_OK);
1318 EXPECT_EQ(waterMark, deleteWaterMark);
1319
1320 /**
1321 * @tc.steps: step6. verify query sync data
1322 * @tc.expected: step6. E_OK and waterMark equal to queryWaterMark
1323 */
1324 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark(queryId, deviceId, waterMark), E_OK);
1325 EXPECT_EQ(waterMark, queryWaterMark);
1326 EXPECT_EQ(anotherMeta.GetSendQueryWaterMark(queryId, deviceId, waterMark), E_OK);
1327 EXPECT_EQ(waterMark, queryWaterMark);
1328 }
1329
1330 /**
1331 * @tc.name: VerifyLruMap 001
1332 * @tc.desc: Test metaData watermark cache lru ability.
1333 * @tc.type: FUNC
1334 * @tc.require: AR000FN6G9
1335 * @tc.author: zhangqiquan
1336 */
1337 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyLruMap001, TestSize.Level1)
1338 {
1339 LruMap<std::string, QueryWaterMark> lruMap;
1340 const int maxCacheItems = 200;
1341
1342 /**
1343 * @tc.steps: step1. fill items to LruMap
1344 * @tc.expected: step1. E_OK
1345 */
1346 const int startCount = 0;
1347 for (int i = startCount; i < maxCacheItems; i++) {
1348 std::string key = std::to_string(i);
1349 QueryWaterMark value;
1350 value.recvWaterMark = i + 1;
1351 EXPECT_EQ(lruMap.Put(key, value), E_OK);
1352 }
1353
1354 /**
1355 * @tc.steps: step2. get the first item
1356 * @tc.expected: step2. E_OK first item will move to last
1357 */
1358 std::string firstItemKey = std::to_string(startCount);
1359 QueryWaterMark firstItemValue;
1360 EXPECT_EQ(lruMap.Get(firstItemKey, firstItemValue), E_OK);
1361 EXPECT_EQ(firstItemValue.recvWaterMark, 1u);
1362
1363 /**
1364 * @tc.steps: step3. insert new items to LruMap
1365 * @tc.expected: step3. the second items was removed
1366 */
1367 std::string key = std::to_string(maxCacheItems);
1368 QueryWaterMark value;
1369 value.recvWaterMark = maxCacheItems;
1370 EXPECT_EQ(lruMap.Put(key, value), E_OK);
1371
1372 /**
1373 * @tc.steps: step4. get the second item
1374 * @tc.expected: step4. E_NOT_FOUND it was removed by algorithm
1375 */
1376 std::string secondItemKey = std::to_string(startCount + 1);
1377 QueryWaterMark secondItemValue;
1378 EXPECT_EQ(lruMap.Get(secondItemKey, secondItemValue), -E_NOT_FOUND);
1379 EXPECT_EQ(secondItemValue.recvWaterMark, 0u);
1380 }
1381
1382 /**
1383 * @tc.name: VerifyMetaDataInit 001
1384 * @tc.desc: Test metaData init correctly
1385 * @tc.type: FUNC
1386 * @tc.require: AR000FN6G9
1387 * @tc.author: zhangqiquan
1388 */
1389 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataInit001, TestSize.Level1)
1390 {
1391 Metadata meta;
1392 VirtualSingleVerSyncDBInterface storage;
1393
1394 /**
1395 * @tc.steps: step1. initialize meta with storage
1396 * @tc.expected: step1. E_OK
1397 */
1398 ASSERT_EQ(meta.Initialize(&storage), E_OK);
1399
1400 DeviceID deviceA = "DeviceA";
1401 DeviceID deviceB = "DeviceA";
1402 WaterMark setWaterMark = 1;
1403
1404 /**
1405 * @tc.steps: step2. meta save and get waterMark
1406 * @tc.expected: step2. expect get the same waterMark
1407 */
1408 EXPECT_EQ(meta.SaveLocalWaterMark(deviceA, setWaterMark), E_OK);
1409 EXPECT_EQ(meta.SaveLocalWaterMark(deviceB, setWaterMark), E_OK);
1410 WaterMark getWaterMark = 0;
1411 meta.GetLocalWaterMark(deviceA, getWaterMark);
1412 EXPECT_EQ(getWaterMark, setWaterMark);
1413 meta.GetLocalWaterMark(deviceB, getWaterMark);
1414 EXPECT_EQ(getWaterMark, setWaterMark);
1415
1416
1417 /**
1418 * @tc.steps: step3. init again
1419 * @tc.expected: step3. E_OK
1420 */
1421 Metadata anotherMeta;
1422 ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1423
1424 /**
1425 * @tc.steps: step4. get waterMark again
1426 * @tc.expected: step4. expect get the same waterMark
1427 */
1428 anotherMeta.GetLocalWaterMark(deviceA, getWaterMark);
1429 EXPECT_EQ(getWaterMark, setWaterMark);
1430 anotherMeta.GetLocalWaterMark(deviceB, getWaterMark);
1431 EXPECT_EQ(getWaterMark, setWaterMark);
1432 }
1433
1434 namespace {
InitVerifyStorageEnvironment(Metadata & meta,VirtualSingleVerSyncDBInterface & storage,const std::string & deviceId,const int & startCount,const uint32_t & maxStoreItems)1435 void InitVerifyStorageEnvironment(Metadata &meta, VirtualSingleVerSyncDBInterface &storage,
1436 const std::string &deviceId, const int &startCount, const uint32_t &maxStoreItems)
1437 {
1438 /**
1439 * @tc.steps: step1. initialize meta with storage
1440 * @tc.expected: step1. E_OK
1441 */
1442 ASSERT_EQ(meta.Initialize(&storage), E_OK);
1443
1444 /**
1445 * @tc.steps: step2. fill items to metadata
1446 * @tc.expected: step2. E_OK
1447 */
1448 for (uint32_t i = startCount; i < maxStoreItems; i++) {
1449 std::string queryId = std::to_string(i);
1450 WaterMark recvWaterMark = i + 1;
1451 EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, recvWaterMark), E_OK);
1452 }
1453 }
1454 }
1455
1456 /**
1457 * @tc.name: VerifyManagerQuerySyncStorage 001
1458 * @tc.desc: Test metaData remove least used querySync storage items.
1459 * @tc.type: FUNC
1460 * @tc.require: AR000FN6G9
1461 * @tc.author: zhangqiquan
1462 */
1463 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage001, TestSize.Level3)
1464 {
1465 Metadata meta;
1466 VirtualSingleVerSyncDBInterface storage;
1467 const uint32_t maxStoreItems = 100000;
1468 const int startCount = 0;
1469 const std::string deviceId = "Device";
1470
1471 InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1472
1473 /**
1474 * @tc.steps: step3. insert new items to metadata
1475 * @tc.expected: step3. E_OK
1476 */
1477 std::string newQueryId = std::to_string(maxStoreItems);
1478 WaterMark newWaterMark = maxStoreItems + 1;
1479 EXPECT_EQ(meta.SetRecvQueryWaterMark(newQueryId, deviceId, newWaterMark), E_OK);
1480
1481 /**
1482 * @tc.steps: step4. touch the first item
1483 * @tc.expected: step4. E_OK update first item used time
1484 */
1485 std::string firstItemKey = std::to_string(startCount);
1486 WaterMark firstWaterMark = 11u;
1487 EXPECT_EQ(meta.SetRecvQueryWaterMark(firstItemKey, deviceId, firstWaterMark), E_OK);
1488
1489 /**
1490 * @tc.steps: step5. initialize new meta with storage
1491 * @tc.expected: step5. the second item will be removed
1492 */
1493 Metadata newMeta;
1494 ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1495
1496 /**
1497 * @tc.steps: step6. touch the first item
1498 * @tc.expected: step6. E_OK it still exist
1499 */
1500 WaterMark exceptWaterMark;
1501 EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, exceptWaterMark), E_OK);
1502 EXPECT_EQ(exceptWaterMark, firstWaterMark);
1503
1504 /**
1505 * @tc.steps: step7. get the second item
1506 * @tc.expected: step7. NOT_FOUND secondWaterMark is zero
1507 */
1508 WaterMark secondWaterMark;
1509 std::string secondQueryId = std::to_string(startCount + 1);
1510 EXPECT_EQ(newMeta.GetRecvQueryWaterMark(secondQueryId, deviceId, secondWaterMark), E_OK);
1511 EXPECT_EQ(secondWaterMark, 0u);
1512 }
1513
1514 /**
1515 * @tc.name: VerifyMetaDbCreateTime 001
1516 * @tc.desc: Test metaData get and set cbCreateTime.
1517 * @tc.type: FUNC
1518 * @tc.require: AR000FN6G9
1519 * @tc.author: zhuwentao
1520 */
1521 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDbCreateTime001, TestSize.Level1)
1522 {
1523 Metadata meta;
1524 VirtualSingleVerSyncDBInterface storage;
1525 /**
1526 * @tc.steps: step1. initialize meta with storage
1527 * @tc.expected: step1. E_OK
1528 */
1529 int errCode = meta.Initialize(&storage);
1530 ASSERT_EQ(errCode, E_OK);
1531 /**
1532 * @tc.steps: step2. set local and peer watermark and dbCreateTime
1533 * @tc.expected: step4. E_OK
1534 */
1535 WaterMark value = 2;
1536 EXPECT_EQ(meta.SaveLocalWaterMark("D1", value), E_OK);
1537 EXPECT_EQ(meta.SavePeerWaterMark("D1", value, true), E_OK);
1538 EXPECT_EQ(meta.SetDbCreateTime("D1", 10u, true), E_OK);
1539 /**
1540 * @tc.steps: step3. check peer and local watermark and dbCreateTime
1541 * @tc.expected: step4. E_OK
1542 */
1543 WaterMark curValue = 0;
1544 meta.GetLocalWaterMark("D1", curValue);
1545 EXPECT_EQ(value, curValue);
1546 meta.GetPeerWaterMark("D1", curValue);
1547 EXPECT_EQ(value, curValue);
1548 uint64_t curDbCreatTime = 0;
1549 meta.GetDbCreateTime("D1", curDbCreatTime);
1550 EXPECT_EQ(curDbCreatTime, 10u);
1551 /**
1552 * @tc.steps: step3. change dbCreateTime and check
1553 * @tc.expected: step4. E_OK
1554 */
1555 EXPECT_EQ(meta.SetDbCreateTime("D1", 20u, true), E_OK);
1556 uint64_t clearDeviceDataMark = INT_MAX;
1557 meta.GetRemoveDataMark("D1", clearDeviceDataMark);
1558 EXPECT_EQ(clearDeviceDataMark, 1u);
1559 EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1"), E_OK);
1560 meta.GetRemoveDataMark("D1", clearDeviceDataMark);
1561 EXPECT_EQ(clearDeviceDataMark, 0u);
1562 meta.GetDbCreateTime("D1", curDbCreatTime);
1563 EXPECT_EQ(curDbCreatTime, 20u);
1564 }
1565
1566 /**
1567 * @tc.name: VerifyManagerQuerySyncStorage 002
1568 * @tc.desc: Test metaData remove least used querySync storage items when exit wrong data.
1569 * @tc.type: FUNC
1570 * @tc.require: AR000FN6G9
1571 * @tc.author: zhangqiquan
1572 */
1573 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage002, TestSize.Level3)
1574 {
1575 Metadata meta;
1576 VirtualSingleVerSyncDBInterface storage;
1577 const uint32_t maxStoreItems = 100000;
1578 const int startCount = 0;
1579 const std::string deviceId = "Device";
1580
1581 InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1582
1583 /**
1584 * @tc.steps: step3. insert a wrong Value
1585 * @tc.expected: step3. E_OK
1586 */
1587 std::string newQueryId = std::to_string(maxStoreItems);
1588 Key dbKey;
1589 DBCommon::StringToVector(QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
1590 + DBCommon::TransferHashString(deviceId) + newQueryId, dbKey);
1591 Value wrongValue;
1592 EXPECT_EQ(storage.PutMetaData(dbKey, wrongValue), E_OK);
1593
1594 /**
1595 * @tc.steps: step4. initialize new meta with storage
1596 * @tc.expected: step4. E_OK
1597 */
1598 Metadata newMeta;
1599 ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1600
1601 /**
1602 * @tc.steps: step5. touch the first item
1603 * @tc.expected: step5. E_OK still exit
1604 */
1605 std::string firstItemKey = std::to_string(startCount);
1606 WaterMark exceptWaterMark;
1607 EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, exceptWaterMark), E_OK);
1608 EXPECT_EQ(exceptWaterMark, 1u);
1609 }
1610
1611 /**
1612 * @tc.name: AllPredicateQuerySync001
1613 * @tc.desc: Test normal push sync for AllPredicate data.
1614 * @tc.type: FUNC
1615 * @tc.require: AR000FN6G9
1616 * @tc.author: zhuwentao
1617 */
1618 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync001, TestSize.Level1)
1619 {
1620 /**
1621 * @tc.steps: step1. InitSchemaDb
1622 */
1623 InitSchemaDb();
1624 DBStatus status = OK;
1625 std::vector<std::string> devices;
1626 devices.push_back(g_deviceB->GetDeviceId());
1627
1628 /**
1629 * @tc.steps: step2. deviceA put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1630 {key21, SCHEMA_VALUE2} - {key29, SCHEMA_VALUE2}
1631 */
1632 Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1633 Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1634 Key key = {'1'};
1635 Key key2 = {'2'};
1636 const int dataSize = 4000;
1637 for (int i = 0; i < dataSize; i++) {
1638 key.push_back(i);
1639 key2.push_back(i);
1640 status = g_schemaKvDelegatePtr->Put(key, value);
1641 ASSERT_TRUE(status == OK);
1642 status = g_schemaKvDelegatePtr->Put(key2, value2);
1643 ASSERT_TRUE(status == OK);
1644 key.pop_back();
1645 key2.pop_back();
1646 }
1647 ASSERT_TRUE(status == OK);
1648
1649 /**
1650 * @tc.steps: step3. deviceA call query sync and wait
1651 * @tc.expected: step3. sync should return OK.
1652 */
1653 Query query = Query::Select().EqualTo("$.field_name1", 1);
1654 std::map<std::string, DBStatus> result;
1655 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1656 ASSERT_TRUE(status == OK);
1657
1658 /**
1659 * @tc.expected: step4. onComplete should be called, DeviceB have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1660 */
1661 ASSERT_TRUE(result.size() == devices.size());
1662 for (const auto &pair : result) {
1663 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1664 EXPECT_TRUE(pair.second == OK);
1665 }
1666 VirtualDataItem item;
1667 VirtualDataItem item2;
1668 for (int i = 0; i < dataSize; i++) {
1669 key.push_back(i);
1670 key2.push_back(i);
1671 g_deviceB->GetData(key, item);
1672 EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1673 EXPECT_TRUE(item.value == value);
1674 key.pop_back();
1675 key2.pop_back();
1676 }
1677 }
1678
1679 /**
1680 * @tc.name: AllPredicateQuerySync002
1681 * @tc.desc: Test wrong query param push sync for AllPredicate data.
1682 * @tc.type: FUNC
1683 * @tc.require: AR000FN6G9
1684 * @tc.author: zhuwentao
1685 */
1686 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync002, TestSize.Level1)
1687 {
1688 /**
1689 * @tc.steps: step1. InitSchemaDb
1690 */
1691 InitSchemaDb();
1692 DBStatus status = OK;
1693 std::vector<std::string> devices;
1694 devices.push_back(g_deviceB->GetDeviceId());
1695
1696 /**
1697 * @tc.steps: step2. deviceA call query sync and wait
1698 * @tc.expected: step2. sync should return INVALID_QUERY_FIELD
1699 */
1700 Query query = Query::Select().GreaterThan("field_name11", 10);
1701 std::map<std::string, DBStatus> result;
1702 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1703 ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1704 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
1705 ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1706 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1707 ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1708 }
1709
1710 /**
1711 * @tc.name: AllPredicateQuerySync003
1712 * @tc.desc: Test normal push sync for AllPredicate data with limit
1713 * @tc.type: FUNC
1714 * @tc.require: AR000FN6G9
1715 * @tc.author: zhuwentao
1716 */
1717 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync003, TestSize.Level1)
1718 {
1719 /**
1720 * @tc.steps: step1. InitSchemaDb
1721 */
1722 InitSchemaDb();
1723 DBStatus status = OK;
1724 std::vector<std::string> devices;
1725 devices.push_back(g_deviceB->GetDeviceId());
1726
1727 /**
1728 * @tc.steps: step2. deviceA put {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1729 */
1730 Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1731 Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1732 Key key = {'1'};
1733 Key key2 = {'2'};
1734 const int dataSize = 10;
1735 for (int i = 0; i < dataSize; i++) {
1736 key.push_back(i);
1737 key2.push_back(i);
1738 status = g_schemaKvDelegatePtr->Put(key, value);
1739 ASSERT_TRUE(status == OK);
1740 status = g_schemaKvDelegatePtr->Put(key2, value2);
1741 ASSERT_TRUE(status == OK);
1742 key.pop_back();
1743 key2.pop_back();
1744 }
1745 ASSERT_TRUE(status == OK);
1746
1747 /**
1748 * @tc.steps: step3. deviceA call query sync with limit and wait
1749 * @tc.expected: step3. sync should return OK.
1750 */
1751 Query query = Query::Select().EqualTo("$.field_name1", 1).Limit(20, 0);
1752 std::map<std::string, DBStatus> result;
1753 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1754 ASSERT_TRUE(status == OK);
1755
1756 /**
1757 * @tc.expected: step4. onComplete should be called, DeviceB have {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1758 */
1759 ASSERT_TRUE(result.size() == devices.size());
1760 for (const auto &pair : result) {
1761 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1762 EXPECT_TRUE(pair.second == OK);
1763 }
1764 VirtualDataItem item;
1765 VirtualDataItem item2;
1766 for (int i = 0; i < dataSize; i++) {
1767 key.push_back(i);
1768 key2.push_back(i);
1769 g_deviceB->GetData(key, item);
1770 EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1771 EXPECT_TRUE(item.value == value);
1772 key.pop_back();
1773 key2.pop_back();
1774 }
1775 }
1776
1777 /**
1778 * @tc.name: AllPredicateQuerySync004
1779 * @tc.desc: Test normal pull sync for AllPredicate data.
1780 * @tc.type: FUNC
1781 * @tc.require: AR000FN6G9
1782 * @tc.author: zhuwentao
1783 */
1784 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync004, TestSize.Level1)
1785 {
1786 /**
1787 * @tc.steps: step1. InitSchemaDb
1788 */
1789 InitSchemaDb();
1790 DBStatus status = OK;
1791 std::vector<std::string> devices;
1792 devices.push_back(g_deviceB->GetDeviceId());
1793
1794 /**
1795 * @tc.steps: step2. deviceB put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1796 */
1797 Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1798 Key key = {'1'};
1799 const int dataSize = 10;
1800 for (int i = 0; i < dataSize; i++) {
1801 key.push_back(i);
1802 g_deviceB->PutData(key, value, 10 + i, 0);
1803 ASSERT_TRUE(status == OK);
1804 key.pop_back();
1805 }
1806 ASSERT_TRUE(status == OK);
1807
1808 /**
1809 * @tc.steps: step3. deviceA call query sync and wait
1810 * @tc.expected: step3. sync should return OK.
1811 */
1812 Query query = Query::Select().EqualTo("$.field_name1", 1);
1813 std::map<std::string, DBStatus> result;
1814 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1815 ASSERT_TRUE(status == OK);
1816
1817 /**
1818 * @tc.expected: step4. onComplete should be called, DeviceA have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1819 */
1820 ASSERT_TRUE(result.size() == devices.size());
1821 for (const auto &pair : result) {
1822 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1823 EXPECT_TRUE(pair.second == OK);
1824 }
1825 Value item;
1826 Value item2;
1827 for (int i = 0; i < dataSize; i++) {
1828 key.push_back(i);
1829 g_schemaKvDelegatePtr->Get(key, item);
1830 EXPECT_TRUE(item == value);
1831 key.pop_back();
1832 }
1833 }