1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <thread>
18
19 #include "db_constant.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "kv_store_nb_delegate.h"
23 #include "kv_virtual_device.h"
24 #include "platform_specific.h"
25 #include "query.h"
26 #include "query_sync_object.h"
27 #include "runtime_config.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_serialize_manager.h"
30 #include "subscribe_manager.h"
31 #include "subscribe_recorder.h"
32 #include "sync_types.h"
33
34 using namespace testing::ext;
35 using namespace DistributedDB;
36 using namespace DistributedDBUnitTest;
37 using namespace std;
38
39 namespace {
40 string g_testDir;
41 const string SCHEMA_STORE_ID = "kv_store_sync_schema_test";
42 const std::string DEVICE_A = "deviceA";
43 const std::string DEVICE_B = "deviceB";
44 const std::string DEVICE_C = "deviceC";
45
46 KvStoreDelegateManager g_schemaMgr(SCHEMA_APP_ID, USER_ID);
47 KvStoreConfig g_config;
48 DistributedDBToolsUnitTest g_tool;
49 DBStatus g_schemaKvDelegateStatus = INVALID_ARGS;
50 KvStoreNbDelegate* g_schemaKvDelegatePtr = nullptr;
51 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
52 KvVirtualDevice* g_deviceB = nullptr;
53 KvVirtualDevice* g_deviceC = nullptr;
54
55 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
56 auto g_schemaKvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
57 placeholders::_1, placeholders::_2, std::ref(g_schemaKvDelegateStatus), std::ref(g_schemaKvDelegatePtr));
58 const string SCHEMA_STRING =
59 "{\"SCHEMA_VERSION\":\"1.0\","
60 "\"SCHEMA_MODE\":\"STRICT\","
61 "\"SCHEMA_DEFINE\":{"
62 "\"field_name1\":\"BOOL\","
63 "\"field_name2\":\"BOOL\","
64 "\"field_name3\":\"INTEGER, NOT NULL\","
65 "\"field_name4\":\"LONG, DEFAULT 100\","
66 "\"field_name5\":\"DOUBLE, NOT NULL, DEFAULT 3.14\","
67 "\"field_name6\":\"STRING, NOT NULL, DEFAULT '3.1415'\","
68 "\"field_name7\":\"LONG, DEFAULT 100\","
69 "\"field_name8\":\"LONG, DEFAULT 100\","
70 "\"field_name9\":\"LONG, DEFAULT 100\","
71 "\"field_name10\":\"LONG, DEFAULT 100\""
72 "},"
73 "\"SCHEMA_INDEXES\":[\"$.field_name1\", \"$.field_name2\"]}";
74
75 const std::string SCHEMA_VALUE1 =
76 "{\"field_name1\":true,"
77 "\"field_name2\":false,"
78 "\"field_name3\":10,"
79 "\"field_name4\":20,"
80 "\"field_name5\":3.14,"
81 "\"field_name6\":\"3.1415\","
82 "\"field_name7\":100,"
83 "\"field_name8\":100,"
84 "\"field_name9\":100,"
85 "\"field_name10\":100}";
86
87 const std::string SCHEMA_VALUE2 =
88 "{\"field_name1\":false,"
89 "\"field_name2\":true,"
90 "\"field_name3\":100,"
91 "\"field_name4\":200,"
92 "\"field_name5\":3.14,"
93 "\"field_name6\":\"3.1415\","
94 "\"field_name7\":100,"
95 "\"field_name8\":100,"
96 "\"field_name9\":100,"
97 "\"field_name10\":100}";
98
99 class DistributedDBSingleVerP2PSubscribeSyncTest : public testing::Test {
100 public:
101 static void SetUpTestCase(void);
102 static void TearDownTestCase(void);
103 void SetUp();
104 void TearDown();
105 protected:
106 static void WaitUntilNotify(KvVirtualDevice &virtualDevice);
107 };
108
SetUpTestCase(void)109 void DistributedDBSingleVerP2PSubscribeSyncTest::SetUpTestCase(void)
110 {
111 /**
112 * @tc.setup: Init datadir and Virtual Communicator.
113 */
114 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
115 g_config.dataDir = g_testDir;
116 g_schemaMgr.SetKvStoreConfig(g_config);
117
118 string dir = g_testDir + "/single_ver";
119 DIR* dirTmp = opendir(dir.c_str());
120 if (dirTmp == nullptr) {
121 OS::MakeDBDirectory(dir);
122 } else {
123 closedir(dirTmp);
124 }
125
126 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
127 ASSERT_TRUE(g_communicatorAggregator != nullptr);
128 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
129 }
130
TearDownTestCase(void)131 void DistributedDBSingleVerP2PSubscribeSyncTest::TearDownTestCase(void)
132 {
133 /**
134 * @tc.teardown: Release virtual Communicator and clear data dir.
135 */
136 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
137 LOGE("rm test db files error!");
138 }
139 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
140 }
141
SetUp(void)142 void DistributedDBSingleVerP2PSubscribeSyncTest::SetUp(void)
143 {
144 DistributedDBToolsUnitTest::PrintTestCaseInfo();
145 /**
146 * @tc.setup: create virtual device B and get a KvStoreNbDelegate as deviceA
147 */
148 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
149 ASSERT_TRUE(g_deviceB != nullptr);
150 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
151 ASSERT_TRUE(syncInterfaceB != nullptr);
152 syncInterfaceB->SetSchemaInfo(SCHEMA_STRING);
153 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
154 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
155 ASSERT_TRUE(g_deviceC != nullptr);
156 VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
157 ASSERT_TRUE(syncInterfaceC != nullptr);
158 syncInterfaceC->SetSchemaInfo(SCHEMA_STRING);
159 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
160 }
161
TearDown(void)162 void DistributedDBSingleVerP2PSubscribeSyncTest::TearDown(void)
163 {
164 /**
165 * @tc.teardown: Release device A, B
166 */
167 if (g_schemaKvDelegatePtr != nullptr) {
168 ASSERT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
169 g_schemaKvDelegatePtr = nullptr;
170 DBStatus status = g_schemaMgr.DeleteKvStore(SCHEMA_STORE_ID);
171 LOGD("delete kv store status %d", status);
172 ASSERT_TRUE(status == OK);
173 }
174 if (g_deviceB != nullptr) {
175 delete g_deviceB;
176 g_deviceB = nullptr;
177 }
178 if (g_deviceC != nullptr) {
179 delete g_deviceC;
180 g_deviceC = nullptr;
181 }
182 PermissionCheckCallbackV2 nullCallback;
183 EXPECT_EQ(g_schemaMgr.SetPermissionCheckCallback(nullCallback), OK);
184 }
185
WaitUntilNotify(KvVirtualDevice & virtualDevice)186 void DistributedDBSingleVerP2PSubscribeSyncTest::WaitUntilNotify(KvVirtualDevice &virtualDevice)
187 {
188 bool notify = false;
189 std::condition_variable cv;
190 std::mutex notifyMutex;
191 virtualDevice.SetPushNotifier([¬ify, &cv, ¬ifyMutex](const std::string &) {
192 {
193 std::lock_guard<std::mutex> autoLock(notifyMutex);
194 notify = true;
195 }
196 cv.notify_all();
197 });
198 {
199 LOGI("Begin wait notify");
200 std::unique_lock<std::mutex> uniqueLock(notifyMutex);
201 (void)cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [¬ify]() {
202 return notify;
203 });
204 LOGI("End wait notify");
205 }
206 virtualDevice.SetPushNotifier(nullptr);
207 std::this_thread::sleep_for(std::chrono::seconds(1));
208 }
209
InitSubSchemaDb()210 void InitSubSchemaDb()
211 {
212 g_config.dataDir = g_testDir;
213 g_schemaMgr.SetKvStoreConfig(g_config);
214 KvStoreNbDelegate::Option option;
215 option.schema = SCHEMA_STRING;
216 g_schemaMgr.GetKvStore(SCHEMA_STORE_ID, option, g_schemaKvDelegateCallback);
217 ASSERT_TRUE(g_schemaKvDelegateStatus == OK);
218 ASSERT_TRUE(g_schemaKvDelegatePtr != nullptr);
219 }
220
CheckUnFinishedMap(uint32_t sizeA,uint32_t sizeB,std::vector<std::string> & deviceAQueies,std::vector<std::string> & deviceBQueies,SubscribeManager & subManager)221 void CheckUnFinishedMap(uint32_t sizeA, uint32_t sizeB, std::vector<std::string> &deviceAQueies,
222 std::vector<std::string> &deviceBQueies, SubscribeManager &subManager)
223 {
224 std::map<std::string, std::vector<QuerySyncObject>> allSyncQueries;
225 subManager.GetAllUnFinishSubQueries(allSyncQueries);
226 ASSERT_TRUE(allSyncQueries[DEVICE_A].size() == sizeA);
227 ASSERT_TRUE(allSyncQueries[DEVICE_B].size() == sizeB);
228 for (auto &item : allSyncQueries[DEVICE_A]) {
229 std::string queryId = item.GetIdentify();
230 ASSERT_TRUE(std::find(deviceAQueies.begin(), deviceAQueies.end(), queryId) != deviceAQueies.end());
231 }
232 for (auto &item : allSyncQueries[DEVICE_B]) {
233 std::string queryId = item.GetIdentify();
234 ASSERT_TRUE(std::find(deviceBQueies.begin(), deviceBQueies.end(), queryId) != deviceBQueies.end());
235 }
236 }
237
InitLocalSubscribeMap(QuerySyncObject & queryCommonObj,std::map<std::string,QuerySyncObject> & queryMap,std::vector<std::string> & deviceAQueies,std::vector<std::string> & deviceBQueies,SubscribeManager & subManager)238 void InitLocalSubscribeMap(QuerySyncObject &queryCommonObj, std::map<std::string, QuerySyncObject> &queryMap,
239 std::vector<std::string> &deviceAQueies, std::vector<std::string> &deviceBQueies, SubscribeManager &subManager)
240 {
241 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
242 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
243 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_B, queryCommonObj) == E_OK);
244 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryCommonObj) == E_OK);
245 queryMap[queryCommonObj.GetIdentify()] = queryCommonObj;
246 deviceAQueies.push_back(queryCommonObj.GetIdentify());
247 deviceBQueies.push_back(queryCommonObj.GetIdentify());
248 for (int i = 0; i < 3; i++) { // 3 subscribe
249 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
250 deviceAQueies.push_back(querySyncObj.GetIdentify());
251 queryMap[querySyncObj.GetIdentify()] = querySyncObj;
252 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_A, querySyncObj) == E_OK);
253 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_A, querySyncObj) == E_OK);
254 }
255 for (int i = 0; i < 1; i++) {
256 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('b' + i)}));
257 deviceBQueies.push_back(querySyncObj.GetIdentify());
258 queryMap[querySyncObj.GetIdentify()] = querySyncObj;
259 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_B, querySyncObj) == E_OK);
260 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_B, querySyncObj) == E_OK);
261 }
262 }
263 /**
264 * @tc.name: SubscribeRequestTest001
265 * @tc.desc: test Serialize/DoSerialize SubscribeRequest
266 * @tc.type: FUNC
267 * @tc.require:
268 * @tc.author: zhuwentao
269 */
270 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeRequestTest001, TestSize.Level1)
271 {
272 /**
273 * @tc.steps: step1. prepare a SubscribeRequest.
274 */
275 auto packet = new (std::nothrow) SubscribeRequest;
276 ASSERT_TRUE(packet != nullptr);
277 packet->SetPacketHead(100, SOFTWARE_VERSION_CURRENT, SUBSCRIBE_QUERY_CMD, 1);
278 Query query = Query::Select().EqualTo("$.field_name1", 1);
279 QuerySyncObject syncQuery(query);
280 packet->SetQuery(syncQuery);
281
282 /**
283 * @tc.steps: step2. put the SubscribeRequest Packet into a message.
284 */
285 Message msg;
286 msg.SetExternalObject(packet);
287 msg.SetMessageId(CONTROL_SYNC_MESSAGE);
288 msg.SetMessageType(TYPE_REQUEST);
289
290 /**
291 * @tc.steps: step3. Serialization the message to a buffer.
292 */
293 int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
294 LOGE("test leng = %d", len);
295 uint8_t *buffer = new (nothrow) uint8_t[len];
296 ASSERT_TRUE(buffer != nullptr);
297 ASSERT_EQ(SingleVerSerializeManager::Serialization(buffer, len, &msg), E_OK);
298
299 /**
300 * @tc.steps: step4. DeSerialization the buffer to a message.
301 */
302 Message outMsg;
303 outMsg.SetMessageId(CONTROL_SYNC_MESSAGE);
304 outMsg.SetMessageType(TYPE_REQUEST);
305 ASSERT_EQ(SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg), E_OK);
306
307 /**
308 * @tc.steps: step5. checkout the outMsg.
309 * @tc.expected: step5. outMsg equal the the in msg
310 */
311 auto outPacket = outMsg.GetObject<SubscribeRequest>();
312 EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
313 EXPECT_EQ(outPacket->GetSendCode(), 100);
314 EXPECT_EQ(outPacket->GetcontrolCmdType(), SUBSCRIBE_QUERY_CMD);
315 EXPECT_EQ(outPacket->GetFlag(), 1u);
316 EXPECT_EQ(outPacket->GetQuery().GetIdentify(), syncQuery.GetIdentify());
317 delete[] buffer;
318 }
319
320 /**
321 * @tc.name: ControlAckTest001
322 * @tc.desc: test Serialize/DoSerialize ControlAckPacket
323 * @tc.type: FUNC
324 * @tc.require:
325 * @tc.author: zhuwentao
326 */
327 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, ControlAckTest001, TestSize.Level1)
328 {
329 /**
330 * @tc.steps: step1. prepare a ControlAckPacket.
331 */
332 ControlAckPacket packet;
333 packet.SetPacketHead(-E_NOT_SUPPORT, SOFTWARE_VERSION_CURRENT, SUBSCRIBE_QUERY_CMD, 1);
334
335 /**
336 * @tc.steps: step2. put the QuerySyncAckPacket into a message.
337 */
338 Message msg;
339 msg.SetCopiedObject(packet);
340 msg.SetMessageId(CONTROL_SYNC_MESSAGE);
341 msg.SetMessageType(TYPE_RESPONSE);
342
343 /**
344 * @tc.steps: step3. Serialization the message to a buffer.
345 */
346 int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
347 LOGE("test leng = %d", len);
348 uint8_t *buffer = new (nothrow) uint8_t[len];
349 ASSERT_TRUE(buffer != nullptr);
350 int errCode = SingleVerSerializeManager::Serialization(buffer, len, &msg);
351 ASSERT_EQ(errCode, E_OK);
352
353 /**
354 * @tc.steps: step4. DeSerialization the buffer to a message.
355 */
356 Message outMsg;
357 outMsg.SetMessageId(CONTROL_SYNC_MESSAGE);
358 outMsg.SetMessageType(TYPE_RESPONSE);
359 errCode = SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg);
360 ASSERT_EQ(errCode, E_OK);
361
362 /**
363 * @tc.steps: step5. checkout the outMsg.
364 * @tc.expected: step5. outMsg equal the the in msg
365 */
366 auto outPacket = outMsg.GetObject<ControlAckPacket>();
367 EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
368 EXPECT_EQ(outPacket->GetRecvCode(), -E_NOT_SUPPORT);
369 EXPECT_EQ(outPacket->GetcontrolCmdType(), SUBSCRIBE_QUERY_CMD);
370 EXPECT_EQ(outPacket->GetFlag(), 1u);
371 delete[] buffer;
372 }
373
374 /**
375 * @tc.name: subscribeManager001
376 * @tc.desc: test subscribe class subscribe local function with one device
377 * @tc.type: FUNC
378 * @tc.require:
379 * @tc.author: zhuwentao
380 */
381 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager001, TestSize.Level1)
382 {
383 SubscribeManager subManager;
384 std::string device = "device_A";
385 /**
386 * @tc.steps: step1. test one device limit four subscribe queries in local map
387 */
388 LOGI("============step 1============");
389 for (int i = 0; i < 4; i++) {
390 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
391 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj) == E_OK);
392 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj) == E_OK);
393 }
394 std::vector<QuerySyncObject> subscribeQueries;
395 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
396 ASSERT_TRUE(subscribeQueries.size() == 4);
397 subscribeQueries.clear();
398 QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 4)}));
399 int errCode = subManager.ReserveLocalSubscribeQuery(device, querySyncObj1);
400 ASSERT_TRUE(errCode != E_OK);
401 /**
402 * @tc.steps: step2. allow to subscribe existed query
403 */
404 LOGI("============step 2============");
405 QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 3)}));
406 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
407 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
408 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
409 ASSERT_TRUE(subscribeQueries.size() == 4);
410 subscribeQueries.clear();
411 /**
412 * @tc.steps: step3. unsubscribe no existed queries
413 */
414 LOGI("============step 3============");
415 subManager.RemoveLocalSubscribeQuery(device, querySyncObj1);
416 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
417 ASSERT_TRUE(subscribeQueries.size() == 4);
418 subscribeQueries.clear();
419 /**
420 * @tc.steps: step4. unsubscribe queries
421 */
422 LOGI("============step 4============");
423 for (int i = 0; i < 4; i++) {
424 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
425 subManager.RemoveLocalSubscribeQuery(device, querySyncObj);
426 }
427 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
428 ASSERT_TRUE(subscribeQueries.size() == 0);
429
430 /**
431 * @tc.steps: step5. reserve twice while subscribe queries
432 */
433 LOGI("============step 5============");
434 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
435 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
436 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
437 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
438 ASSERT_TRUE(subscribeQueries.size() == 1);
439 subscribeQueries.clear();
440 subManager.RemoveLocalSubscribeQuery(device, querySyncObj2);
441 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
442 ASSERT_TRUE(subscribeQueries.size() == 0);
443 }
444
445 /**
446 * @tc.name: subscribeManager002
447 * @tc.desc: test subscribe class subscribe remote function with one device
448 * @tc.type: FUNC
449 * @tc.require:
450 * @tc.author: zhuwentao
451 */
452 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager002, TestSize.Level1)
453 {
454 SubscribeManager subManager;
455 std::string device = "device_A";
456 /**
457 * @tc.steps: step1. test one device limit four subscribe queries in remote map
458 */
459 LOGI("============step 1============");
460 for (int i = 0; i < 4; i++) {
461 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
462 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj) == E_OK);
463 ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device, querySyncObj) == E_OK);
464 }
465 QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 4)}));
466 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj1) != E_OK);
467 std::vector<std::string> subscribeQueryId;
468 subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
469 ASSERT_TRUE(subscribeQueryId.size() == 4);
470 subscribeQueryId.clear();
471 /**
472 * @tc.steps: step2. allow to subscribe existed query
473 */
474 LOGI("============step 2============");
475 QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 3)}));
476 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj2) == E_OK);
477 ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device, querySyncObj2) == E_OK);
478 subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
479 ASSERT_TRUE(subscribeQueryId.size() == 4);
480 subscribeQueryId.clear();
481 /**
482 * @tc.steps: step3. unsubscribe no existed queries
483 */
484 LOGI("============step 3============");
485 subManager.RemoveRemoteSubscribeQuery(device, querySyncObj1);
486 subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
487 ASSERT_TRUE(subscribeQueryId.size() == 4);
488 subscribeQueryId.clear();
489 /**
490 * @tc.steps: step4. unsubscribe queries
491 */
492 LOGI("============step 4============");
493 for (int i = 0; i < 4; i++) {
494 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
495 subManager.RemoveRemoteSubscribeQuery(device, querySyncObj);
496 }
497 subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
498 ASSERT_TRUE(subscribeQueryId.size() == 0);
499 }
500
501 /**
502 * @tc.name: subscribeManager003
503 * @tc.desc: test subscribe class subscribe remote function with multi device
504 * @tc.type: FUNC
505 * @tc.require:
506 * @tc.author: zhuwentao
507 */
508 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager003, TestSize.Level1)
509 {
510 SubscribeManager subManager;
511 std::string device = "device_";
512 std::vector<QuerySyncObject> subscribeQueries;
513 /**
514 * @tc.steps: step1. test mutil device limit 32 devices in remote map and check each device has one subscribe
515 */
516 LOGI("============step 1============");
517 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 1)}));
518 for (int i = 0; i < 32; i++) {
519 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
520 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
521 }
522 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(33), querySyncObj) != E_OK);
523 for (int i = 0; i < 32; i++) {
524 subManager.GetLocalSubscribeQueries(device + std::to_string(i), subscribeQueries);
525 ASSERT_TRUE(subscribeQueries.size() == 1);
526 subscribeQueries.clear();
527 }
528 /**
529 * @tc.steps: step2. clear remote subscribe query map and check each device has no subscribe
530 */
531 LOGI("============step 2============");
532 for (int i = 0; i < 32; i++) {
533 subManager.ClearLocalSubscribeQuery(device + std::to_string(i));
534 subManager.GetLocalSubscribeQueries(device + std::to_string(i), subscribeQueries);
535 ASSERT_TRUE(subscribeQueries.size() == 0);
536 subscribeQueries.clear();
537 }
538 /**
539 * @tc.steps: step3. test mutil device limit 8 queries in db and check each device has one subscribe
540 */
541 LOGI("============step 3============");
542 for (int i = 0; i < 8; i++) {
543 QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
544 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
545 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
546 }
547 QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 8)}));
548 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(8), querySyncObj1) != E_OK);
549 }
550
551 /**
552 * @tc.name: subscribeManager004
553 * @tc.desc: test subscribe class subscribe remote function with multi device
554 * @tc.type: FUNC
555 * @tc.require:
556 * @tc.author: zhuwentao
557 */
558 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager004, TestSize.Level1)
559 {
560 SubscribeManager subManager;
561 std::string device = "device_";
562 std::vector<std::string> subscribeQueryId;
563 /**
564 * @tc.steps: step1. test mutil device limit 32 devices in remote map and check each device has one subscribe
565 */
566 LOGI("============step 1============");
567 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 1)}));
568 for (int i = 0; i < 32; i++) {
569 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
570 ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
571 }
572 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(33), querySyncObj) != E_OK);
573 for (int i = 0; i < 32; i++) {
574 subManager.GetRemoteSubscribeQueryIds(device + std::to_string(i), subscribeQueryId);
575 ASSERT_TRUE(subscribeQueryId.size() == 1);
576 subscribeQueryId.clear();
577 }
578 /**
579 * @tc.steps: step2. clear remote subscribe query map and check each device has no subscribe
580 */
581 LOGI("============step 2============");
582 for (int i = 0; i < 32; i++) {
583 subManager.ClearRemoteSubscribeQuery(device + std::to_string(i));
584 subManager.GetRemoteSubscribeQueryIds(device + std::to_string(i), subscribeQueryId);
585 ASSERT_TRUE(subscribeQueryId.size() == 0);
586 subscribeQueryId.clear();
587 }
588 subManager.ClearRemoteSubscribeQuery(device);
589 /**
590 * @tc.steps: step3. test mutil device limit 8 queries in db and check each device has one subscribe
591 */
592 LOGI("============step 3============");
593 for (int i = 0; i < 8; i++) {
594 QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
595 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
596 ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
597 }
598 QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 8)}));
599 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(8), querySyncObj1) != E_OK);
600 }
601
602 /**
603 * @tc.name: subscribeManager005
604 * @tc.desc: test subscribe class subscribe remote function with put into unfinished map
605 * @tc.type: FUNC
606 * @tc.require:
607 * @tc.author: zhuwentao
608 */
609 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager005, TestSize.Level1)
610 {
611 SubscribeManager subManager;
612 std::vector<QuerySyncObject> subscribeQueries;
613 std::map<std::string, QuerySyncObject> queryMap;
614 std::vector<std::string> deviceAQueies;
615 std::vector<std::string> deviceBQueies;
616 QuerySyncObject queryCommonObj(Query::Select().PrefixKey({'a'}));
617 /**
618 * @tc.steps: step1. test one devices has 4 subscribes and another has 2 in local map, put into unfinished map
619 */
620 LOGI("============step 1============");
621 InitLocalSubscribeMap(queryCommonObj, queryMap, deviceAQueies, deviceBQueies, subManager);
622 /**
623 * @tc.steps: step2. check all device unFinished subscribe queries and put into unfinished map
624 */
625 LOGI("============step 2============");
626 subManager.GetLocalSubscribeQueries(DEVICE_A, subscribeQueries);
627 ASSERT_TRUE(subscribeQueries.size() == 4);
628 subManager.PutLocalUnFinishedSubQueries(DEVICE_A, subscribeQueries);
629 subscribeQueries.clear();
630 subManager.GetLocalSubscribeQueries(DEVICE_B, subscribeQueries);
631 ASSERT_TRUE(subscribeQueries.size() == 2);
632 subManager.PutLocalUnFinishedSubQueries(DEVICE_B, subscribeQueries);
633 subscribeQueries.clear();
634 /**
635 * @tc.steps: step3. get all device unFinished subscribe queries and check
636 */
637 LOGI("============step 3============");
638 CheckUnFinishedMap(4, 2, deviceAQueies, deviceBQueies, subManager);
639 /**
640 * @tc.steps: step4. active some subscribe queries
641 */
642 LOGI("============step 4============");
643 subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
644 subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryMap[deviceAQueies[3]]);
645 subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryMap[deviceBQueies[1]]);
646 deviceAQueies.erase(deviceAQueies.begin() + 3);
647 deviceAQueies.erase(deviceAQueies.begin());
648 queryMap.erase(queryMap[deviceBQueies[1]].GetIdentify());
649 deviceBQueies.erase(deviceBQueies.begin() + 1);
650 /**
651 * @tc.steps: step5. get all device unFinished subscribe queries and check
652 */
653 LOGI("============step 5============");
654 CheckUnFinishedMap(2, 1, deviceAQueies, deviceBQueies, subManager);
655 /**
656 * @tc.steps: step6. remove left subscribe queries
657 */
658 LOGI("============step 6============");
659 for (int i = 0; i < 2; i++) {
660 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
661 subManager.RemoveLocalSubscribeQuery(DEVICE_A, querySyncObj);
662 }
663 subManager.RemoveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
664 subManager.RemoveLocalSubscribeQuery(DEVICE_B, queryCommonObj);
665 /**
666 * @tc.steps: step7. get all device unFinished subscribe queries and check
667 */
668 LOGI("============step 7============");
669 CheckUnFinishedMap(0, 0, deviceAQueies, deviceBQueies, subManager);
670 }
671
672 /**
673 * @tc.name: subscribeManager006
674 * @tc.desc: test exception branch of subscribe manager
675 * @tc.type: FUNC
676 * @tc.require:
677 * @tc.author: zhangshijie
678 */
679 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager006, TestSize.Level1)
680 {
681 /**
682 * @tc.steps: step1. active a query sync object which is not in local subscribe map
683 * @tc.expected:step1 return -E_INTERNAL_ERROR
684 */
685 SubscribeManager subManager;
686 QuerySyncObject queryCommonObj(Query::Select().PrefixKey({'a'}));
687 EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj), -E_INTERNAL_ERROR);
688 subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryCommonObj);
689 subManager.RemoveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
690 std::vector<QuerySyncObject> subscribeQueries;
691 subManager.PutLocalUnFinishedSubQueries(DEVICE_A, subscribeQueries);
692 std::map<std::string, std::vector<QuerySyncObject>> allSyncQueries;
693 subManager.GetAllUnFinishSubQueries(allSyncQueries);
694
695 /**
696 * @tc.steps: step2. call IsLastRemoteContainSubscribe with a device not in remote subscribe map
697 * @tc.expected: step2 return false
698 */
699 std::string queryId = "queryId";
700 EXPECT_EQ(subManager.IsLastRemoteContainSubscribe(DEVICE_A, queryId), false);
701
702 /**
703 * @tc.steps: step3. active local subscribe with a device which is not in local subscribe map and
704 * a query sync object which is in local subscribe map
705 * @tc.expected: step3 return -E_INTERNAL_ERROR
706 */
707 std::vector<std::string> deviceAQueies;
708 std::vector<std::string> deviceBQueies;
709 std::map<std::string, QuerySyncObject> queryMap;
710 InitLocalSubscribeMap(queryCommonObj, queryMap, deviceAQueies, deviceBQueies, subManager);
711 ASSERT_TRUE(queryMap.size() > 0);
712 std::string devNotExists = "device_not_exists";
713 EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(devNotExists, queryMap.begin()->second), -E_INTERNAL_ERROR);
714 QuerySyncObject queryObj(Query::Select().PrefixKey({'b'}));
715 EXPECT_EQ(subManager.ReserveLocalSubscribeQuery("test_dev", queryObj), E_OK);
716 subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryObj);
717
718 EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryObj), -E_INTERNAL_ERROR);
719 subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryCommonObj);
720 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
721 ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
722 EXPECT_EQ(subManager.IsLastRemoteContainSubscribe(DEVICE_A, queryId), false);
723 deviceAQueies.push_back(DEVICE_A);
724 EXPECT_EQ(subManager.LocalSubscribeLimitCheck(deviceAQueies, queryCommonObj), E_OK);
725
726 /**
727 * @tc.steps: step4. add MAX_DEVICES_NUM device, then call LocalSubscribeLimitCheck
728 * @tc.expected: step4 return -E_MAX_LIMITS
729 */
730 for (size_t i = 0 ; i < MAX_DEVICES_NUM; i++) {
731 deviceAQueies.push_back("device_" + std::to_string(i));
732 }
733 EXPECT_EQ(subManager.LocalSubscribeLimitCheck(deviceAQueies, queryCommonObj), -E_MAX_LIMITS);
734 }
735
736 /**
737 * @tc.name: subscribeSync001
738 * @tc.desc: test subscribe normal sync
739 * @tc.type: FUNC
740 * @tc.require:
741 * @tc.author: zhuwentao
742 */
743 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync001, TestSize.Level1)
744 {
745 /**
746 * @tc.steps: step1. InitSchemaDb
747 */
748 LOGI("============step 1============");
749 InitSubSchemaDb();
750 DBStatus status = OK;
751 std::vector<std::string> devices;
752 devices.push_back(g_deviceB->GetDeviceId());
753 Query query = Query::Select().EqualTo("$.field_name1", 1);
754 QuerySyncObject querySyncObj(query);
755
756 /**
757 * @tc.steps: step2. deviceB subscribe query to deviceA
758 */
759 LOGI("============step 2============");
760 g_deviceB->Subscribe(querySyncObj, true, 1);
761
762 /**
763 * @tc.steps: step3. deviceA put {key1, SCHEMA_VALUE1} and wait 1s
764 */
765 LOGI("============step 3============");
766 Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
767 Key key = {'1'};
768 status = g_schemaKvDelegatePtr->Put(key, value);
769 EXPECT_EQ(status, OK);
770 WaitUntilNotify(*g_deviceB);
771 /**
772 * @tc.steps: step4. deviceB has {key11, SCHEMA_VALUE1}
773 */
774 LOGI("============step 4============");
775 VirtualDataItem item;
776 g_deviceB->GetData(key, item);
777 EXPECT_TRUE(item.value == value);
778
779 /**
780 * @tc.steps: step5. deviceB unsubscribe query to deviceA
781 */
782 g_deviceB->UnSubscribe(querySyncObj, true, 2);
783
784 /**
785 * @tc.steps: step5. deviceA put {key2, SCHEMA_VALUE1} and wait 1s
786 */
787 LOGI("============step 5============");
788 Value value2(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
789 Key key2 = {'2'};
790 status = g_schemaKvDelegatePtr->Put(key2, value2);
791 EXPECT_EQ(status, OK);
792 WaitUntilNotify(*g_deviceB);
793 /**
794 * @tc.steps: step6. deviceB don't has {key2, SCHEMA_VALUE1}
795 */
796 LOGI("============step 6============");
797 VirtualDataItem item2;
798 EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
799 }
800
801 /**
802 * @tc.name: subscribeSync002
803 * @tc.desc: test subscribe sync over 32 devices,limit,orderBy
804 * @tc.type: FUNC
805 * @tc.require:
806 * @tc.author: zhuwentao
807 */
808 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync002, TestSize.Level1)
809 {
810 /**
811 * @tc.steps: step1. InitSchemaDb
812 */
813 LOGI("============step 1============");
814 InitSubSchemaDb();
815 std::vector<std::string> devices;
816 std::string device = "device_";
817 Query query = Query::Select().EqualTo("$.field_name1", 1);
818
819 /**
820 * @tc.steps: step2. deviceA subscribe query to 33 devices, and return overlimit
821 */
822 LOGI("============step 2============");
823 for (int i = 0; i < 33; i++) {
824 devices.push_back(device + std::to_string(i));
825 }
826 EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true) == OVER_MAX_LIMITS);
827
828 /**
829 * @tc.steps: step3. deviceA subscribe query with limit
830 */
831 LOGI("============step 3============");
832 devices.clear();
833 devices.push_back("device_B");
834 Query query2 = Query::Select().EqualTo("$.field_name1", 1).Limit(20, 0);
835 EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query2, true) == NOT_SUPPORT);
836
837 /**
838 * @tc.steps: step4. deviceA subscribe query with orderBy
839 */
840 LOGI("============step 4============");
841 Query query3 = Query::Select().EqualTo("$.field_name1", 1).OrderBy("$.field_name7");
842 EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query3, true) == NOT_SUPPORT);
843 }
844
845 /**
846 * @tc.name: subscribeSync003
847 * @tc.desc: test subscribe sync with inkeys query
848 * @tc.type: FUNC
849 * @tc.require:
850 * @tc.author: lidongwei
851 */
852 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync003, TestSize.Level1)
853 {
854 /**
855 * @tc.steps: step1. InitSchemaDb
856 */
857 LOGI("============step 1============");
858 InitSubSchemaDb();
859 std::vector<std::string> devices;
860 devices.push_back(g_deviceB->GetDeviceId());
861 g_deviceB->Online();
862
863 /**
864 * @tc.steps: step2. deviceB subscribe inkeys(k2k4) query to deviceA
865 */
866 LOGI("============step 2============");
867 Query query = Query::Select().InKeys({KEY_2, KEY_4});
868 g_deviceB->Subscribe(QuerySyncObject(query), true, 1);
869
870 /**
871 * @tc.steps: step3. deviceA put k1-k5 and wait
872 */
873 LOGI("============step 3============");
874 EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
875 {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
876 {KEY_2, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
877 {KEY_3, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
878 {KEY_4, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
879 {KEY_5, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
880 }));
881 WaitUntilNotify(*g_deviceB);
882
883 /**
884 * @tc.steps: step4. deviceB has k2k4, has no k1k3k5
885 */
886 LOGI("============step 4============");
887 VirtualDataItem item;
888 EXPECT_EQ(g_deviceB->GetData(KEY_2, item), E_OK);
889 EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
890 EXPECT_EQ(g_deviceB->GetData(KEY_4, item), E_OK);
891 EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
892 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
893 EXPECT_EQ(g_deviceB->GetData(KEY_3, item), -E_NOT_FOUND);
894 EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
895 }
896
897 /**
898 * @tc.name: subscribeSync004
899 * @tc.desc: test subscribe sync with inkeys query
900 * @tc.type: FUNC
901 * @tc.require:
902 * @tc.author: lidongwei
903 */
904 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync004, TestSize.Level1)
905 {
906 /**
907 * @tc.steps: step1. InitSchemaDb
908 */
909 LOGI("============step 1============");
910 InitSubSchemaDb();
911 std::vector<std::string> devices;
912 devices.push_back(g_deviceB->GetDeviceId());
913
914 /**
915 * @tc.steps: step2. deviceB subscribe inkeys(k3k5) and equal to query to deviceA
916 */
917 LOGI("============step 2============");
918 Query query = Query::Select().InKeys({KEY_3, KEY_5}).EqualTo("$.field_name3", 100); // 100 for test.
919 g_deviceB->Subscribe(QuerySyncObject(query), true, 2);
920
921 /**
922 * @tc.steps: step3. deviceA put k1v2,k3v2,k5v1 and wait
923 */
924 LOGI("============step 3============");
925 EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
926 {KEY_1, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end())},
927 {KEY_3, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end())},
928 {KEY_5, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
929 }));
930 WaitUntilNotify(*g_deviceB);
931
932 /**
933 * @tc.steps: step4. deviceB has k3, has no k1k5
934 */
935 LOGI("============step 4============");
936 VirtualDataItem item;
937 EXPECT_EQ(g_deviceB->GetData(KEY_3, item), E_OK);
938 EXPECT_EQ(item.value, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end()));
939 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
940 EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
941 }
942
943 /**
944 * @tc.name: subscribeSync005
945 * @tc.desc: test subscribe sync with inkeys query
946 * @tc.type: FUNC
947 * @tc.require:
948 * @tc.author: lidongwei
949 */
950 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync005, TestSize.Level1)
951 {
952 /**
953 * @tc.steps: step1. InitSchemaDb
954 */
955 LOGI("============step 1============");
956 InitSubSchemaDb();
957 std::vector<std::string> devices;
958 devices.push_back(g_deviceB->GetDeviceId());
959
960 /**
961 * @tc.steps: step2. deviceB subscribe inkeys(k1, key6) and prefix key "k" query to deviceA
962 */
963 LOGI("============step 2============");
964 Key key6 { 'k', '6' };
965 Query query = Query::Select().InKeys({KEY_1, key6}).PrefixKey({ 'k' });
966 g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
967
968 /**
969 * @tc.steps: step3. deviceA put k1,key6 and wait
970 */
971 LOGI("============step 3============");
972 EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
973 {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
974 {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
975 }));
976 WaitUntilNotify(*g_deviceB);
977
978 /**
979 * @tc.steps: step4. deviceB has key6, has no k1
980 */
981 LOGI("============step 4============");
982 VirtualDataItem item;
983 EXPECT_EQ(g_deviceB->GetData(key6, item), E_OK);
984 EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
985 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
986 }
987
988
989 /**
990 * @tc.name: subscribeSync006
991 * @tc.desc: test one device unsubscribe no effect other device
992 * @tc.type: FUNC
993 * @tc.require:
994 * @tc.author: zhangqiquan
995 */
996 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync006, TestSize.Level1)
997 {
998 /**
999 * @tc.steps: step1. InitSchemaDb
1000 */
1001 LOGI("============step 1============");
1002 InitSubSchemaDb();
1003 std::vector<std::string> devices;
1004 devices.push_back(g_deviceB->GetDeviceId());
1005 devices.push_back(g_deviceC->GetDeviceId());
1006
1007 /**
1008 * @tc.steps: step2. deviceB unsubscribe inkeys(k1, key6) and prefix key "k" query to deviceA
1009 */
1010 LOGI("============step 2============");
1011 Key key6 { 'k', '6' };
1012 Query query = Query::Select().InKeys({KEY_1, key6}).PrefixKey({ 'k' });
1013 g_deviceB->Online();
1014 g_deviceC->Online();
1015 g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
1016 g_deviceC->Subscribe(QuerySyncObject(query), true, 3);
1017
1018 /**
1019 * @tc.steps: step3. deviceC unsubscribe
1020 */
1021 LOGI("============step 3============");
1022 g_deviceC->UnSubscribe(QuerySyncObject(query), true, 3);
1023
1024 /**
1025 * @tc.steps: step4. deviceA put k1,key6 and wait
1026 */
1027 LOGI("============step 4============");
1028 EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
1029 {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1030 {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1031 }));
1032 WaitUntilNotify(*g_deviceB);
1033
1034 /**
1035 * @tc.steps: step5. deviceB has key6, has no k1
1036 */
1037 LOGI("============step 5============");
1038 VirtualDataItem item;
1039 EXPECT_EQ(g_deviceB->GetData(key6, item), E_OK);
1040 EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
1041 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
1042 }
1043
1044 /**
1045 * @tc.name: subscribeSync007
1046 * @tc.desc: test subscribe query with order by write time
1047 * @tc.type: FUNC
1048 * @tc.require:
1049 * @tc.author: zhuwentao
1050 */
1051 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync007, TestSize.Level1)
1052 {
1053 /**
1054 * @tc.steps: step1. InitSchemaDb
1055 */
1056 LOGI("============step 1============");
1057 InitSubSchemaDb();
1058 std::vector<std::string> devices = {"DEVICE_B"};
1059
1060 /**
1061 * @tc.steps: step2. deviceA subscribe query with order by write time
1062 * * @tc.expected: step2. interface return not support
1063 */
1064 Query query = Query::Select().EqualTo("$.field_name1", 1).OrderByWriteTime(false);
1065 EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true) == NOT_SUPPORT);
1066 EXPECT_TRUE(g_schemaKvDelegatePtr->UnSubscribeRemoteQuery(devices, nullptr, query, true) == NOT_SUPPORT);
1067 }
1068
1069 /**
1070 * @tc.name: SubscribeSync008
1071 * @tc.desc: test subscribe with reopen db
1072 * @tc.type: FUNC
1073 * @tc.require:
1074 * @tc.author: zhangqiquan
1075 */
1076 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync008, TestSize.Level1)
1077 {
1078 /**
1079 * @tc.steps: step1. InitSchemaDb
1080 */
1081 std::shared_ptr<DBInfoHandleTest> handleTest = std::make_shared<DBInfoHandleTest>();
1082 RuntimeConfig::SetDBInfoHandle(handleTest);
1083
1084 LOGI("============step 1============");
1085 InitSubSchemaDb();
1086 std::vector<std::string> devices;
1087 devices.push_back(g_deviceB->GetDeviceId());
1088
1089 /**
1090 * @tc.steps: step2. deviceB subscribe query to deviceA
1091 */
1092 LOGI("============step 2============");
1093 Key key6 { 'k', '6' };
1094 Query query = Query::Select();
1095 g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
1096
1097 /**
1098 * @tc.steps: step3. deviceA put k1,key6 and wait
1099 */
1100 LOGI("============step 3============");
1101 EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
1102 {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1103 }));
1104 EXPECT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
1105 g_schemaKvDelegatePtr = nullptr;
1106 InitSubSchemaDb();
1107 g_deviceB->Online();
1108 WaitUntilNotify(*g_deviceB);
1109
1110 /**
1111 * @tc.steps: step4. deviceB has key6
1112 */
1113 LOGI("============step 4============");
1114 VirtualDataItem item;
1115 if (g_deviceB->GetData(key6, item) == E_OK) {
1116 EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
1117 }
1118 RuntimeConfig::SetDBInfoHandle(nullptr);
1119 }
1120
1121 namespace {
CreateKvVirtualDevice(const std::string & deviceName)1122 KvVirtualDevice *CreateKvVirtualDevice(const std::string &deviceName)
1123 {
1124 KvVirtualDevice *device = nullptr;
1125 do {
1126 if (g_communicatorAggregator == nullptr) {
1127 break;
1128 }
1129 device = new (std::nothrow) KvVirtualDevice(deviceName);
1130 if (device == nullptr) {
1131 break;
1132 }
1133 auto interface = new (std::nothrow) VirtualSingleVerSyncDBInterface();
1134 if (interface == nullptr) {
1135 delete device;
1136 device = nullptr;
1137 break;
1138 }
1139 interface->SetSchemaInfo(SCHEMA_STRING);
1140 EXPECT_EQ(device->Initialize(g_communicatorAggregator, interface), E_OK);
1141 } while (false);
1142 return device;
1143 }
1144 }
1145
1146 /**
1147 * @tc.name: SubscribeSync009
1148 * @tc.desc: test subscribe query with 33 device
1149 * @tc.type: FUNC
1150 * @tc.require:
1151 * @tc.author: zhangqiquan
1152 */
1153 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync009, TestSize.Level3)
1154 {
1155 /**
1156 * @tc.steps: step1. InitSchemaDb
1157 */
1158 LOGI("============step 1============");
1159 InitSubSchemaDb();
1160 const int maxDeviceCount = 32;
1161 std::vector<KvVirtualDevice *> devices;
1162 for (int i = 0; i < maxDeviceCount; ++i) {
1163 std::string deviceName = "D_" + std::to_string(i);
1164 auto device = CreateKvVirtualDevice(deviceName);
1165 EXPECT_NE(device, nullptr);
1166 if (device == nullptr) {
1167 continue;
1168 }
1169 devices.push_back(device);
1170 }
1171
1172 /**
1173 * @tc.steps: step2. 33 device subscribe
1174 */
1175 LOGI("============step 2============");
1176 Query query = Query::Select();
1177 for (const auto &dev: devices) {
1178 dev->Online();
1179 dev->Subscribe(QuerySyncObject(query), true, 1); // sync id is 1
1180 }
1181 g_deviceB->Subscribe(QuerySyncObject(query), true, 1); // sync id is 1
1182 /**
1183 * @tc.steps: step3. 32 unsubscribe
1184 */
1185 LOGI("============step 3============");
__anonf124574d0502(std::map<std::string, int> res) 1186 SyncOperation::UserCallback callback = [](std::map<std::string, int> res) {
1187 ASSERT_EQ(res.size(), 1u);
1188 EXPECT_EQ(res["real_device"], SyncOperation::OP_FINISHED_ALL);
1189 };
1190 for (const auto &dev: devices) {
1191 dev->UnSubscribe(QuerySyncObject(query), true, 1, callback); // sync id is 1
1192 delete dev;
1193 }
1194 }
1195
1196 /*
1197 * @tc.name: SubscribeSync010
1198 * @tc.desc: test subscribe query cache
1199 * @tc.type: FUNC
1200 * @tc.require:
1201 * @tc.author: zhangqiquan
1202 */
1203 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync010, TestSize.Level1)
1204 {
1205 SubscribeRecorder recorder;
1206 DBInfo dbInfo = {
1207 USER_ID,
1208 APP_ID,
1209 STORE_ID_1,
1210 false,
1211 true
1212 };
1213 Query query = Query::Select();
1214 QuerySyncObject querySyncObject(query);
1215 /**
1216 * @tc.steps: step1. Insert one record twice and remove
1217 */
1218 recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1219 recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1220 recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1221 std::map<std::string, std::vector<QuerySyncObject>> subscribeQuery;
1222 recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1223 for (const auto &entry: subscribeQuery) {
1224 EXPECT_EQ(entry.second.size(), 0u);
1225 }
1226 /**
1227 * @tc.steps: step2. Remove no exist data
1228 */
1229 recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1230 recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1231 for (const auto &entry: subscribeQuery) {
1232 EXPECT_EQ(entry.second.size(), 0u);
1233 }
1234 /**
1235 * @tc.steps: step3. insert two data and remove one data
1236 */
1237 recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1238 Query query2 = Query::Select().EqualTo("test", "test");
1239 recorder.RecordSubscribe(dbInfo, DEVICE_A, QuerySyncObject(query2));
1240 recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1241 recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1242 for (const auto &entry: subscribeQuery) {
1243 EXPECT_EQ(entry.second.size(), 1u);
1244 }
1245 /**
1246 * @tc.steps: step4. remove no exist data
1247 */
1248 dbInfo.storeId = STORE_ID_2;
1249 recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A);
1250 dbInfo.storeId = STORE_ID_1;
1251 recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1252 for (const auto &entry: subscribeQuery) {
1253 EXPECT_EQ(entry.second.size(), 1u);
1254 }
1255 }
1256
1257 /*
1258 * @tc.name: SubscribeSync011
1259 * @tc.desc: test subscribe query cache
1260 * @tc.type: FUNC
1261 * @tc.require:
1262 * @tc.author: zhangqiquan
1263 */
1264 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync011, TestSize.Level1)
1265 {
1266 SubscribeRecorder recorder;
1267 DBInfo dbInfo = {
1268 USER_ID,
1269 APP_ID,
1270 STORE_ID_1,
1271 false,
1272 true
1273 };
1274 /**
1275 * @tc.steps: step1. Insert 2 record in db1 and 1 record in db2
1276 */
1277 Query query = Query::Select();
1278 QuerySyncObject querySyncObject(query);
1279 recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1280 recorder.RecordSubscribe(dbInfo, DEVICE_B, querySyncObject);
1281 DBInfo dbInfo2 = dbInfo;
1282 dbInfo2.storeId = STORE_ID_2;
1283 recorder.RecordSubscribe(dbInfo2, DEVICE_B, querySyncObject);
1284 /**
1285 * @tc.steps: step2. Insert 2 record in db1
1286 */
1287 recorder.RemoveRemoteSubscribe(dbInfo);
1288 std::map<std::string, std::vector<QuerySyncObject>> subscribeQuery;
1289 recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1290 EXPECT_EQ(subscribeQuery.size(), 0u);
1291 recorder.GetSubscribeQuery(dbInfo2, subscribeQuery);
1292 EXPECT_EQ(subscribeQuery.size(), 1u);
1293 /**
1294 * @tc.steps: step3. Insert 1 record in db2
1295 */
1296 recorder.RemoveRemoteSubscribe(dbInfo2);
1297 subscribeQuery.clear();
1298 recorder.GetSubscribeQuery(dbInfo2, subscribeQuery);
1299 EXPECT_EQ(subscribeQuery.size(), 0u);
1300 }
1301
1302 /**
1303 * @tc.name: subscribeSync012
1304 * @tc.desc: test one device unsubscribe no effect other device
1305 * @tc.type: FUNC
1306 * @tc.require:
1307 * @tc.author: zhangqiquan
1308 */
1309 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync012, TestSize.Level1)
1310 {
1311 /**
1312 * @tc.steps: step1. InitSchemaDb
1313 */
1314 LOGI("============step 1============");
1315 InitSubSchemaDb();
1316 std::vector<std::string> devices;
1317 devices.push_back(g_deviceB->GetDeviceId());
1318 devices.push_back(g_deviceC->GetDeviceId());
1319
1320 /**
1321 * @tc.steps: step2. deviceB unsubscribe inkeys(k1, key6) and prefix key "k" query to deviceA
1322 */
1323 LOGI("============step 2============");
1324 Key key6 { 'k', '6' };
1325 Query query = Query::Select();
1326 g_deviceB->Online();
1327 g_deviceC->Online();
1328 g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
1329 g_deviceC->Subscribe(QuerySyncObject(query), true, 3);
1330
1331 /**
1332 * @tc.steps: step3. deviceC unsubscribe
1333 */
1334 LOGI("============step 3============");
1335 g_deviceC->Offline();
1336
1337 /**
1338 * @tc.steps: step4. deviceA put k1,key6 and wait
1339 */
1340 LOGI("============step 4============");
1341 const uint8_t putItemCount = 10u;
1342 std::vector<Key> dataKeys;
1343 for (uint8_t i = 0u; i < putItemCount; ++i) {
1344 Key key = { i };
1345 dataKeys.push_back(key);
1346 EXPECT_EQ(g_schemaKvDelegatePtr->Put(key, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())), OK);
1347 WaitUntilNotify(*g_deviceB);
1348 }
1349 /**
1350 * @tc.steps: step5. deviceB has key6, has no k1
1351 */
1352 LOGI("============step 5============");
1353 for (const auto &key: dataKeys) {
1354 VirtualDataItem item;
1355 EXPECT_EQ(g_deviceB->GetData(key, item), E_OK);
1356 EXPECT_EQ(g_deviceC->GetData(key, item), -E_NOT_FOUND);
1357 }
1358 }
1359
1360 /*
1361 * @tc.name: SubscribeSync013
1362 * @tc.desc: test subscribe query cache with remote support false
1363 * @tc.type: FUNC
1364 * @tc.require:
1365 * @tc.author: zhangqiquan
1366 */
1367 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync013, TestSize.Level1)
1368 {
1369 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1370 RuntimeConfig::SetDBInfoHandle(handle);
1371 handle->SetLocalIsSupport(true);
1372 DBInfo dbInfo = {
1373 USER_ID,
1374 APP_ID,
1375 STORE_ID_1,
1376 false,
1377 true
1378 };
1379 RuntimeContext::GetInstance()->SetRemoteOptimizeCommunication(DEVICE_A, false);
1380 Query query = Query::Select();
1381 QuerySyncObject querySyncObject(query);
1382 /**
1383 * @tc.steps: step1. Insert one record
1384 */
1385 RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1386 std::map<std::string, std::vector<QuerySyncObject>> subscribeQuery;
1387 RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1388 EXPECT_EQ(subscribeQuery.size(), 1u);
1389 /**
1390 * @tc.steps: step2. Remove one record
1391 */
1392 RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1393 RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1394 EXPECT_EQ(subscribeQuery[DEVICE_A].size(), 0u);
1395 /**
1396 * @tc.steps: step3. Record again and remove by dbInfo and device
1397 */
1398 RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1399 RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, DEVICE_A);
1400 RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1401 EXPECT_EQ(subscribeQuery.size(), 0u);
1402 /**
1403 * @tc.steps: step4. Record again and remove by device
1404 */
1405 RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1406 RuntimeContext::GetInstance()->RemoveRemoteSubscribe(DEVICE_A);
1407 RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1408 EXPECT_EQ(subscribeQuery.size(), 0u);
1409 /**
1410 * @tc.steps: step5. Record again and remove by dbInfo
1411 */
1412 RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1413 RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo);
1414 RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1415 EXPECT_EQ(subscribeQuery.size(), 0u);
1416 RuntimeConfig::SetDBInfoHandle(nullptr);
1417 }
1418 /**
1419 * @tc.name: SubscribeSync014
1420 * @tc.desc: test device subscribe with put a lot of times
1421 * @tc.type: FUNC
1422 * @tc.require:
1423 * @tc.author: zhangqiquan
1424 */
1425 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync014, TestSize.Level3)
1426 {
1427 /**
1428 * @tc.steps: step1. InitSchemaDb
1429 */
1430 LOGI("============step 1============");
1431 InitSubSchemaDb();
1432 std::vector<std::string> devices;
1433 devices.push_back(g_deviceB->GetDeviceId());
1434 /**
1435 * @tc.steps: step2. deviceB unsubscribe inkeys(k1, key6) and prefix key "k" query to deviceA
1436 */
1437 LOGI("============step 2============");
1438 Key key6 { 'k', '6' };
1439 Query query = Query::Select();
1440 g_deviceB->Online();
1441 g_deviceB->Subscribe(QuerySyncObject(query), true, 1);
1442 /**
1443 * @tc.steps: step3. deviceA put a lot of time
1444 * @tc.expected: step3 put performance was not effected by subscribe
1445 */
1446 LOGI("============step 4============");
1447 std::vector<Key> dataKeys;
1448 const uint64_t PUT_LIMIT_30S = 30 * 1000000; // 30s = 30 * 1000000us
1449 LOGD("BEGIN PUT");
1450 for (uint8_t i = 0u; i < 10u; ++i) { // loop 10 times
1451 Key key = { i };
1452 dataKeys.push_back(key);
1453 uint64_t curTime = 0;
1454 uint64_t lastTime = 0;
1455 EXPECT_EQ(OS::GetCurrentSysTimeInMicrosecond(curTime), E_OK);
1456 lastTime = curTime;
1457 EXPECT_EQ(g_schemaKvDelegatePtr->Put(key, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())), OK);
1458 EXPECT_EQ(OS::GetCurrentSysTimeInMicrosecond(curTime), E_OK);
1459 EXPECT_LE(curTime - lastTime, PUT_LIMIT_30S);
1460 }
1461 LOGD("END PUT");
1462 }
1463
1464 /**
1465 * @tc.name: subscribeSync015
1466 * @tc.desc: test subscribe query with range
1467 * @tc.type: FUNC
1468 * @tc.require:
1469 * @tc.author: mazhao
1470 */
1471 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync015, TestSize.Level1)
1472 {
1473 /**
1474 * @tc.steps: step1. InitSchemaDb
1475 */
1476 LOGI("============step 1============");
1477 InitSubSchemaDb();
1478 std::vector<std::string> devices = {"DEVICE_B"};
1479
1480 /**
1481 * @tc.steps: step2. deviceA subscribe query by Range.
1482 * * @tc.expected: step2. interface return not support
1483 */
1484 Query query = Query::Select().Range({}, {});
1485 EXPECT_EQ(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true), NOT_SUPPORT);
1486 EXPECT_EQ(g_schemaKvDelegatePtr->UnSubscribeRemoteQuery(devices, nullptr, query, true), NOT_SUPPORT);
1487 }
1488 } // namespace