1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <thread>
18
19 #include "db_constant.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "kv_store_nb_delegate.h"
23 #include "kv_virtual_device.h"
24 #include "platform_specific.h"
25 #include "query.h"
26 #include "query_sync_object.h"
27 #include "single_ver_data_sync.h"
28 #include "single_ver_serialize_manager.h"
29 #include "subscribe_manager.h"
30 #include "sync_types.h"
31
32 using namespace testing::ext;
33 using namespace DistributedDB;
34 using namespace DistributedDBUnitTest;
35 using namespace std;
36
37 namespace {
38 string g_testDir;
39 const string SCHEMA_STORE_ID = "kv_store_sync_schema_test";
40 const std::string DEVICE_A = "deviceA";
41 const std::string DEVICE_B = "deviceB";
42 const std::string DEVICE_C = "deviceC";
43
44 KvStoreDelegateManager g_schemaMgr(SCHEMA_APP_ID, USER_ID);
45 KvStoreConfig g_config;
46 DistributedDBToolsUnitTest g_tool;
47 DBStatus g_schemaKvDelegateStatus = INVALID_ARGS;
48 KvStoreNbDelegate* g_schemaKvDelegatePtr = nullptr;
49 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
50 KvVirtualDevice* g_deviceB = nullptr;
51 KvVirtualDevice* g_deviceC = nullptr;
52
53 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
54 auto g_schemaKvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
55 placeholders::_1, placeholders::_2, std::ref(g_schemaKvDelegateStatus), std::ref(g_schemaKvDelegatePtr));
56 const string SCHEMA_STRING =
57 "{\"SCHEMA_VERSION\":\"1.0\","
58 "\"SCHEMA_MODE\":\"STRICT\","
59 "\"SCHEMA_DEFINE\":{"
60 "\"field_name1\":\"BOOL\","
61 "\"field_name2\":\"BOOL\","
62 "\"field_name3\":\"INTEGER, NOT NULL\","
63 "\"field_name4\":\"LONG, DEFAULT 100\","
64 "\"field_name5\":\"DOUBLE, NOT NULL, DEFAULT 3.14\","
65 "\"field_name6\":\"STRING, NOT NULL, DEFAULT '3.1415'\","
66 "\"field_name7\":\"LONG, DEFAULT 100\","
67 "\"field_name8\":\"LONG, DEFAULT 100\","
68 "\"field_name9\":\"LONG, DEFAULT 100\","
69 "\"field_name10\":\"LONG, DEFAULT 100\""
70 "},"
71 "\"SCHEMA_INDEXES\":[\"$.field_name1\", \"$.field_name2\"]}";
72
73 const std::string SCHEMA_VALUE1 =
74 "{\"field_name1\":true,"
75 "\"field_name2\":false,"
76 "\"field_name3\":10,"
77 "\"field_name4\":20,"
78 "\"field_name5\":3.14,"
79 "\"field_name6\":\"3.1415\","
80 "\"field_name7\":100,"
81 "\"field_name8\":100,"
82 "\"field_name9\":100,"
83 "\"field_name10\":100}";
84
85 const std::string SCHEMA_VALUE2 =
86 "{\"field_name1\":false,"
87 "\"field_name2\":true,"
88 "\"field_name3\":100,"
89 "\"field_name4\":200,"
90 "\"field_name5\":3.14,"
91 "\"field_name6\":\"3.1415\","
92 "\"field_name7\":100,"
93 "\"field_name8\":100,"
94 "\"field_name9\":100,"
95 "\"field_name10\":100}";
96 }
97
98 class DistributedDBSingleVerP2PSubscribeSyncTest : public testing::Test {
99 public:
100 static void SetUpTestCase(void);
101 static void TearDownTestCase(void);
102 void SetUp();
103 void TearDown();
104 };
105
SetUpTestCase(void)106 void DistributedDBSingleVerP2PSubscribeSyncTest::SetUpTestCase(void)
107 {
108 /**
109 * @tc.setup: Init datadir and Virtual Communicator.
110 */
111 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
112 g_config.dataDir = g_testDir;
113 g_schemaMgr.SetKvStoreConfig(g_config);
114
115 string dir = g_testDir + "/single_ver";
116 DIR* dirTmp = opendir(dir.c_str());
117 if (dirTmp == nullptr) {
118 OS::MakeDBDirectory(dir);
119 } else {
120 closedir(dirTmp);
121 }
122
123 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
124 ASSERT_TRUE(g_communicatorAggregator != nullptr);
125 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
126 }
127
TearDownTestCase(void)128 void DistributedDBSingleVerP2PSubscribeSyncTest::TearDownTestCase(void)
129 {
130 /**
131 * @tc.teardown: Release virtual Communicator and clear data dir.
132 */
133 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
134 LOGE("rm test db files error!");
135 }
136 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
137 }
138
SetUp(void)139 void DistributedDBSingleVerP2PSubscribeSyncTest::SetUp(void)
140 {
141 DistributedDBToolsUnitTest::PrintTestCaseInfo();
142 /**
143 * @tc.setup: create virtual device B and get a KvStoreNbDelegate as deviceA
144 */
145 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
146 ASSERT_TRUE(g_deviceB != nullptr);
147 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
148 ASSERT_TRUE(syncInterfaceB != nullptr);
149 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
150 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
151 ASSERT_TRUE(g_deviceC != nullptr);
152 VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
153 ASSERT_TRUE(syncInterfaceC != nullptr);
154 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
155 }
156
TearDown(void)157 void DistributedDBSingleVerP2PSubscribeSyncTest::TearDown(void)
158 {
159 /**
160 * @tc.teardown: Release device A, B
161 */
162 if (g_schemaKvDelegatePtr != nullptr) {
163 ASSERT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
164 g_schemaKvDelegatePtr = nullptr;
165 DBStatus status = g_schemaMgr.DeleteKvStore(SCHEMA_STORE_ID);
166 LOGD("delete kv store status %d", status);
167 ASSERT_TRUE(status == OK);
168 }
169 if (g_deviceB != nullptr) {
170 delete g_deviceB;
171 g_deviceB = nullptr;
172 }
173 if (g_deviceC != nullptr) {
174 delete g_deviceC;
175 g_deviceC = nullptr;
176 }
177 PermissionCheckCallbackV2 nullCallback;
178 EXPECT_EQ(g_schemaMgr.SetPermissionCheckCallback(nullCallback), OK);
179 }
180
InitSubSchemaDb()181 void InitSubSchemaDb()
182 {
183 g_config.dataDir = g_testDir;
184 g_schemaMgr.SetKvStoreConfig(g_config);
185 KvStoreNbDelegate::Option option;
186 option.schema = SCHEMA_STRING;
187 g_schemaMgr.GetKvStore(SCHEMA_STORE_ID, option, g_schemaKvDelegateCallback);
188 ASSERT_TRUE(g_schemaKvDelegateStatus == OK);
189 ASSERT_TRUE(g_schemaKvDelegatePtr != nullptr);
190 }
191
CheckUnFinishedMap(uint32_t sizeA,uint32_t sizeB,std::vector<std::string> & deviceAQueies,std::vector<std::string> & deviceBQueies,SubscribeManager & subManager)192 void CheckUnFinishedMap(uint32_t sizeA, uint32_t sizeB, std::vector<std::string> &deviceAQueies,
193 std::vector<std::string> &deviceBQueies, SubscribeManager &subManager)
194 {
195 std::map<std::string, std::vector<QuerySyncObject>> allSyncQueries;
196 subManager.GetAllUnFinishSubQueries(allSyncQueries);
197 ASSERT_TRUE(allSyncQueries[DEVICE_A].size() == sizeA);
198 ASSERT_TRUE(allSyncQueries[DEVICE_B].size() == sizeB);
199 for (auto &item : allSyncQueries[DEVICE_A]) {
200 std::string queryId = item.GetIdentify();
201 ASSERT_TRUE(std::find(deviceAQueies.begin(), deviceAQueies.end(), queryId) != deviceAQueies.end());
202 }
203 for (auto &item : allSyncQueries[DEVICE_B]) {
204 std::string queryId = item.GetIdentify();
205 ASSERT_TRUE(std::find(deviceBQueies.begin(), deviceBQueies.end(), queryId) != deviceBQueies.end());
206 }
207 }
208
InitLocalSubscribeMap(QuerySyncObject & queryCommonObj,std::map<std::string,QuerySyncObject> & queryMap,std::vector<std::string> & deviceAQueies,std::vector<std::string> & deviceBQueies,SubscribeManager & subManager)209 void InitLocalSubscribeMap(QuerySyncObject &queryCommonObj, std::map<std::string, QuerySyncObject> &queryMap,
210 std::vector<std::string> &deviceAQueies, std::vector<std::string> &deviceBQueies, SubscribeManager &subManager)
211 {
212 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
213 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
214 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_B, queryCommonObj) == E_OK);
215 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryCommonObj) == E_OK);
216 queryMap[queryCommonObj.GetIdentify()] = queryCommonObj;
217 deviceAQueies.push_back(queryCommonObj.GetIdentify());
218 deviceBQueies.push_back(queryCommonObj.GetIdentify());
219 for (int i = 0; i < 3; i++) { // 3 subscribe
220 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
221 deviceAQueies.push_back(querySyncObj.GetIdentify());
222 queryMap[querySyncObj.GetIdentify()] = querySyncObj;
223 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_A, querySyncObj) == E_OK);
224 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_A, querySyncObj) == E_OK);
225 }
226 for (int i = 0; i < 1; i++) {
227 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('b' + i)}));
228 deviceBQueies.push_back(querySyncObj.GetIdentify());
229 queryMap[querySyncObj.GetIdentify()] = querySyncObj;
230 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_B, querySyncObj) == E_OK);
231 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_B, querySyncObj) == E_OK);
232 }
233 }
234 /**
235 * @tc.name: SubscribeRequestTest001
236 * @tc.desc: test Serialize/DoSerialize SubscribeRequest
237 * @tc.type: FUNC
238 * @tc.require: AR000FN6G9
239 * @tc.author: zhuwentao
240 */
241 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeRequestTest001, TestSize.Level1)
242 {
243 /**
244 * @tc.steps: step1. prepare a SubscribeRequest.
245 */
246 auto packet = new (std::nothrow) SubscribeRequest;
247 ASSERT_TRUE(packet != nullptr);
248 packet->SetPacketHead(100, SOFTWARE_VERSION_CURRENT, SUBSCRIBE_QUERY_CMD, 1);
249 Query query = Query::Select().EqualTo("$.field_name1", 1);
250 QuerySyncObject syncQuery(query);
251 packet->SetQuery(syncQuery);
252
253 /**
254 * @tc.steps: step2. put the SubscribeRequest Packet into a message.
255 */
256 Message msg;
257 msg.SetExternalObject(packet);
258 msg.SetMessageId(CONTROL_SYNC_MESSAGE);
259 msg.SetMessageType(TYPE_REQUEST);
260
261 /**
262 * @tc.steps: step3. Serialization the message to a buffer.
263 */
264 int len = SingleVerSerializeManager::CalculateLen(&msg);
265 LOGE("test leng = %d", len);
266 uint8_t *buffer = new (nothrow) uint8_t[len];
267 ASSERT_TRUE(buffer != nullptr);
268 ASSERT_EQ(SingleVerSerializeManager::Serialization(buffer, len, &msg), E_OK);
269
270 /**
271 * @tc.steps: step4. DeSerialization the buffer to a message.
272 */
273 Message outMsg;
274 outMsg.SetMessageId(CONTROL_SYNC_MESSAGE);
275 outMsg.SetMessageType(TYPE_REQUEST);
276 ASSERT_EQ(SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg), E_OK);
277
278 /**
279 * @tc.steps: step5. checkout the outMsg.
280 * @tc.expected: step5. outMsg equal the the in msg
281 */
282 auto outPacket = outMsg.GetObject<SubscribeRequest>();
283 EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
284 EXPECT_EQ(outPacket->GetSendCode(), 100);
285 EXPECT_EQ(outPacket->GetcontrolCmdType(), SUBSCRIBE_QUERY_CMD);
286 EXPECT_EQ(outPacket->GetFlag(), 1u);
287 EXPECT_EQ(outPacket->GetQuery().GetIdentify(), syncQuery.GetIdentify());
288 delete[] buffer;
289 }
290
291 /**
292 * @tc.name: ControlAckTest001
293 * @tc.desc: test Serialize/DoSerialize ControlAckPacket
294 * @tc.type: FUNC
295 * @tc.require: AR000FN6G9
296 * @tc.author: zhuwentao
297 */
298 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, ControlAckTest001, TestSize.Level1)
299 {
300 /**
301 * @tc.steps: step1. prepare a ControlAckPacket.
302 */
303 ControlAckPacket packet;
304 packet.SetPacketHead(-E_NOT_SUPPORT, SOFTWARE_VERSION_CURRENT, SUBSCRIBE_QUERY_CMD, 1);
305
306 /**
307 * @tc.steps: step2. put the QuerySyncAckPacket into a message.
308 */
309 Message msg;
310 msg.SetCopiedObject(packet);
311 msg.SetMessageId(CONTROL_SYNC_MESSAGE);
312 msg.SetMessageType(TYPE_RESPONSE);
313
314 /**
315 * @tc.steps: step3. Serialization the message to a buffer.
316 */
317 int len = SingleVerSerializeManager::CalculateLen(&msg);
318 LOGE("test leng = %d", len);
319 uint8_t *buffer = new (nothrow) uint8_t[len];
320 ASSERT_TRUE(buffer != nullptr);
321 int errCode = SingleVerSerializeManager::Serialization(buffer, len, &msg);
322 ASSERT_EQ(errCode, E_OK);
323
324 /**
325 * @tc.steps: step4. DeSerialization the buffer to a message.
326 */
327 Message outMsg;
328 outMsg.SetMessageId(CONTROL_SYNC_MESSAGE);
329 outMsg.SetMessageType(TYPE_RESPONSE);
330 errCode = SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg);
331 ASSERT_EQ(errCode, E_OK);
332
333 /**
334 * @tc.steps: step5. checkout the outMsg.
335 * @tc.expected: step5. outMsg equal the the in msg
336 */
337 auto outPacket = outMsg.GetObject<ControlAckPacket>();
338 EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
339 EXPECT_EQ(outPacket->GetRecvCode(), -E_NOT_SUPPORT);
340 EXPECT_EQ(outPacket->GetcontrolCmdType(), SUBSCRIBE_QUERY_CMD);
341 EXPECT_EQ(outPacket->GetFlag(), 1u);
342 delete[] buffer;
343 }
344
345 /**
346 * @tc.name: subscribeManager001
347 * @tc.desc: test subscribe class subscribe local function with one device
348 * @tc.type: FUNC
349 * @tc.require: AR000FN6G9
350 * @tc.author: zhuwentao
351 */
352 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeManager001, TestSize.Level1)
353 {
354 SubscribeManager subManager;
355 std::string device = "device_A";
356 /**
357 * @tc.steps: step1. test one device limit four subscribe queries in local map
358 */
359 LOGI("============step 1============");
360 for (int i = 0; i < 4; i++) {
361 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
362 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj) == E_OK);
363 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj) == E_OK);
364 }
365 std::vector<QuerySyncObject> subscribeQueries;
366 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
367 ASSERT_TRUE(subscribeQueries.size() == 4);
368 subscribeQueries.clear();
369 QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 4)}));
370 int errCode = subManager.ReserveLocalSubscribeQuery(device, querySyncObj1);
371 ASSERT_TRUE(errCode != E_OK);
372 /**
373 * @tc.steps: step2. allow to subscribe existed query
374 */
375 LOGI("============step 2============");
376 QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 3)}));
377 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
378 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
379 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
380 ASSERT_TRUE(subscribeQueries.size() == 4);
381 subscribeQueries.clear();
382 /**
383 * @tc.steps: step3. unsubscribe no existed queries
384 */
385 LOGI("============step 3============");
386 subManager.RemoveLocalSubscribeQuery(device, querySyncObj1);
387 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
388 ASSERT_TRUE(subscribeQueries.size() == 4);
389 subscribeQueries.clear();
390 /**
391 * @tc.steps: step4. unsubscribe queries
392 */
393 LOGI("============step 4============");
394 for (int i = 0; i < 4; i++) {
395 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
396 subManager.RemoveLocalSubscribeQuery(device, querySyncObj);
397 }
398 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
399 ASSERT_TRUE(subscribeQueries.size() == 0);
400
401 /**
402 * @tc.steps: step5. reserve twice while subscribe queries
403 */
404 LOGI("============step 5============");
405 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
406 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
407 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
408 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
409 ASSERT_TRUE(subscribeQueries.size() == 1);
410 subscribeQueries.clear();
411 subManager.RemoveLocalSubscribeQuery(device, querySyncObj2);
412 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
413 ASSERT_TRUE(subscribeQueries.size() == 0);
414 }
415
416 /**
417 * @tc.name: subscribeManager002
418 * @tc.desc: test subscribe class subscribe remote function with one device
419 * @tc.type: FUNC
420 * @tc.require: AR000FN6G9
421 * @tc.author: zhuwentao
422 */
423 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeManager002, TestSize.Level1)
424 {
425 SubscribeManager subManager;
426 std::string device = "device_A";
427 /**
428 * @tc.steps: step1. test one device limit four subscribe queries in remote map
429 */
430 LOGI("============step 1============");
431 for (int i = 0; i < 4; i++) {
432 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
433 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj) == E_OK);
434 ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device, querySyncObj) == E_OK);
435 }
436 QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 4)}));
437 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj1) != E_OK);
438 std::vector<std::string> subscribeQueryId;
439 subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
440 ASSERT_TRUE(subscribeQueryId.size() == 4);
441 subscribeQueryId.clear();
442 /**
443 * @tc.steps: step2. allow to subscribe existed query
444 */
445 LOGI("============step 2============");
446 QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 3)}));
447 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj2) == E_OK);
448 ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device, querySyncObj2) == E_OK);
449 subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
450 ASSERT_TRUE(subscribeQueryId.size() == 4);
451 subscribeQueryId.clear();
452 /**
453 * @tc.steps: step3. unsubscribe no existed queries
454 */
455 LOGI("============step 3============");
456 subManager.RemoveRemoteSubscribeQuery(device, querySyncObj1);
457 subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
458 ASSERT_TRUE(subscribeQueryId.size() == 4);
459 subscribeQueryId.clear();
460 /**
461 * @tc.steps: step4. unsubscribe queries
462 */
463 LOGI("============step 4============");
464 for (int i = 0; i < 4; i++) {
465 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
466 subManager.RemoveRemoteSubscribeQuery(device, querySyncObj);
467 }
468 subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
469 ASSERT_TRUE(subscribeQueryId.size() == 0);
470 }
471
472 /**
473 * @tc.name: subscribeManager003
474 * @tc.desc: test subscribe class subscribe remote function with multi device
475 * @tc.type: FUNC
476 * @tc.require: AR000FN6G9
477 * @tc.author: zhuwentao
478 */
479 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeManager003, TestSize.Level1)
480 {
481 SubscribeManager subManager;
482 std::string device = "device_";
483 std::vector<QuerySyncObject> subscribeQueries;
484 /**
485 * @tc.steps: step1. test mutil device limit 32 devices in remote map and check each device has one subscribe
486 */
487 LOGI("============step 1============");
488 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 1)}));
489 for (int i = 0; i < 32; i++) {
490 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
491 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
492 }
493 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(33), querySyncObj) != E_OK);
494 for (int i = 0; i < 32; i++) {
495 subManager.GetLocalSubscribeQueries(device + std::to_string(i), subscribeQueries);
496 ASSERT_TRUE(subscribeQueries.size() == 1);
497 subscribeQueries.clear();
498 }
499 /**
500 * @tc.steps: step2. clear remote subscribe query map and check each device has no subscribe
501 */
502 LOGI("============step 2============");
503 for (int i = 0; i < 32; i++) {
504 subManager.ClearLocalSubscribeQuery(device + std::to_string(i));
505 subManager.GetLocalSubscribeQueries(device + std::to_string(i), subscribeQueries);
506 ASSERT_TRUE(subscribeQueries.size() == 0);
507 subscribeQueries.clear();
508 }
509 /**
510 * @tc.steps: step3. test mutil device limit 8 queries in db and check each device has one subscribe
511 */
512 LOGI("============step 3============");
513 for (int i = 0; i < 8; i++) {
514 QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
515 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
516 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
517 }
518 QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 8)}));
519 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(8), querySyncObj1) != E_OK);
520 }
521
522 /**
523 * @tc.name: subscribeManager004
524 * @tc.desc: test subscribe class subscribe remote function with multi device
525 * @tc.type: FUNC
526 * @tc.require: AR000FN6G9
527 * @tc.author: zhuwentao
528 */
529 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeManager004, TestSize.Level1)
530 {
531 SubscribeManager subManager;
532 std::string device = "device_";
533 std::vector<std::string> subscribeQueryId;
534 /**
535 * @tc.steps: step1. test mutil device limit 32 devices in remote map and check each device has one subscribe
536 */
537 LOGI("============step 1============");
538 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 1)}));
539 for (int i = 0; i < 32; i++) {
540 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
541 ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
542 }
543 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(33), querySyncObj) != E_OK);
544 for (int i = 0; i < 32; i++) {
545 subManager.GetRemoteSubscribeQueryIds(device + std::to_string(i), subscribeQueryId);
546 ASSERT_TRUE(subscribeQueryId.size() == 1);
547 subscribeQueryId.clear();
548 }
549 /**
550 * @tc.steps: step2. clear remote subscribe query map and check each device has no subscribe
551 */
552 LOGI("============step 2============");
553 for (int i = 0; i < 32; i++) {
554 subManager.ClearRemoteSubscribeQuery(device + std::to_string(i));
555 subManager.GetRemoteSubscribeQueryIds(device + std::to_string(i), subscribeQueryId);
556 ASSERT_TRUE(subscribeQueryId.size() == 0);
557 subscribeQueryId.clear();
558 }
559 subManager.ClearRemoteSubscribeQuery(device);
560 /**
561 * @tc.steps: step3. test mutil device limit 8 queries in db and check each device has one subscribe
562 */
563 LOGI("============step 3============");
564 for (int i = 0; i < 8; i++) {
565 QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
566 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
567 ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
568 }
569 QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 8)}));
570 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(8), querySyncObj1) != E_OK);
571 }
572
573 /**
574 * @tc.name: subscribeManager005
575 * @tc.desc: test subscribe class subscribe remote function with put into unfinished map
576 * @tc.type: FUNC
577 * @tc.require: AR000FN6G9
578 * @tc.author: zhuwentao
579 */
580 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeManager005, TestSize.Level1)
581 {
582 SubscribeManager subManager;
583 std::vector<QuerySyncObject> subscribeQueries;
584 std::map<std::string, QuerySyncObject> queryMap;
585 std::vector<std::string> deviceAQueies;
586 std::vector<std::string> deviceBQueies;
587 QuerySyncObject queryCommonObj(Query::Select().PrefixKey({'a'}));
588 /**
589 * @tc.steps: step1. test one devices has 4 subscribes and another has 2 in local map, put into unfinished map
590 */
591 LOGI("============step 1============");
592 InitLocalSubscribeMap(queryCommonObj, queryMap, deviceAQueies, deviceBQueies, subManager);
593 /**
594 * @tc.steps: step2. check all device unFinished subscribe queries and put into unfinished map
595 */
596 LOGI("============step 2============");
597 subManager.GetLocalSubscribeQueries(DEVICE_A, subscribeQueries);
598 ASSERT_TRUE(subscribeQueries.size() == 4);
599 subManager.PutLocalUnFiniedSubQueries(DEVICE_A, subscribeQueries);
600 subscribeQueries.clear();
601 subManager.GetLocalSubscribeQueries(DEVICE_B, subscribeQueries);
602 ASSERT_TRUE(subscribeQueries.size() == 2);
603 subManager.PutLocalUnFiniedSubQueries(DEVICE_B, subscribeQueries);
604 subscribeQueries.clear();
605 /**
606 * @tc.steps: step3. get all device unFinished subscribe queries and check
607 */
608 LOGI("============step 3============");
609 CheckUnFinishedMap(4, 2, deviceAQueies, deviceBQueies, subManager);
610 /**
611 * @tc.steps: step4. active some subscribe queries
612 */
613 LOGI("============step 4============");
614 subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
615 subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryMap[deviceAQueies[3]]);
616 subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryMap[deviceBQueies[1]]);
617 deviceAQueies.erase(deviceAQueies.begin() + 3);
618 deviceAQueies.erase(deviceAQueies.begin());
619 queryMap.erase(queryMap[deviceBQueies[1]].GetIdentify());
620 deviceBQueies.erase(deviceBQueies.begin() + 1);
621 /**
622 * @tc.steps: step5. get all device unFinished subscribe queries and check
623 */
624 LOGI("============step 5============");
625 CheckUnFinishedMap(2, 1, deviceAQueies, deviceBQueies, subManager);
626 /**
627 * @tc.steps: step6. remove left subscribe queries
628 */
629 LOGI("============step 6============");
630 for (int i = 0; i < 2; i++) {
631 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
632 subManager.RemoveLocalSubscribeQuery(DEVICE_A, querySyncObj);
633 }
634 subManager.RemoveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
635 subManager.RemoveLocalSubscribeQuery(DEVICE_B, queryCommonObj);
636 /**
637 * @tc.steps: step7. get all device unFinished subscribe queries and check
638 */
639 LOGI("============step 7============");
640 CheckUnFinishedMap(0, 0, deviceAQueies, deviceBQueies, subManager);
641 }
642
643 /**
644 * @tc.name: subscribeManager006
645 * @tc.desc: test exception branch of subscribe manager
646 * @tc.type: FUNC
647 * @tc.require:
648 * @tc.author: zhangshijie
649 */
650 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeManager006, TestSize.Level1)
651 {
652 /**
653 * @tc.steps: step1. active a query sync object which is not in local subscribe map
654 * @tc.expected:step1 return -E_INTERNAL_ERROR
655 */
656 SubscribeManager subManager;
657 QuerySyncObject queryCommonObj(Query::Select().PrefixKey({'a'}));
658 EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj), -E_INTERNAL_ERROR);
659 subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryCommonObj);
660 subManager.RemoveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
661 std::vector<QuerySyncObject> subscribeQueries;
662 subManager.PutLocalUnFiniedSubQueries(DEVICE_A, subscribeQueries);
663 std::map<std::string, std::vector<QuerySyncObject>> allSyncQueries;
664 subManager.GetAllUnFinishSubQueries(allSyncQueries);
665
666 /**
667 * @tc.steps: step2. call IsLastRemoteContainSubscribe with a device not in remote subscribe map
668 * @tc.expected: step2 return false
669 */
670 std::string queryId = "queryId";
671 EXPECT_EQ(subManager.IsLastRemoteContainSubscribe(DEVICE_A, queryId), false);
672
673 /**
674 * @tc.steps: step3. active local subscribe with a device which is not in local subscribe map and
675 * a query sync object which is in local subscribe map
676 * @tc.expected: step3 return -E_INTERNAL_ERROR
677 */
678 std::vector<std::string> deviceAQueies;
679 std::vector<std::string> deviceBQueies;
680 std::map<std::string, QuerySyncObject> queryMap;
681 InitLocalSubscribeMap(queryCommonObj, queryMap, deviceAQueies, deviceBQueies, subManager);
682 ASSERT_TRUE(queryMap.size() > 0);
683 std::string devNotExists = "device_not_exists";
684 EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(devNotExists, queryMap.begin()->second), -E_INTERNAL_ERROR);
685 QuerySyncObject queryObj(Query::Select().PrefixKey({'b'}));
686 EXPECT_EQ(subManager.ReserveLocalSubscribeQuery("test_dev", queryObj), E_OK);
687 subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryObj);
688
689 EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryObj), -E_INTERNAL_ERROR);
690 subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryCommonObj);
691 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
692 ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
693 EXPECT_EQ(subManager.IsLastRemoteContainSubscribe(DEVICE_A, queryId), false);
694 deviceAQueies.push_back(DEVICE_A);
695 EXPECT_EQ(subManager.LocalSubscribeLimitCheck(deviceAQueies, queryCommonObj), E_OK);
696
697 /**
698 * @tc.steps: step4. add MAX_DEVICES_NUM device, then call LocalSubscribeLimitCheck
699 * @tc.expected: step4 return -E_MAX_LIMITS
700 */
701 for (size_t i = 0 ; i < MAX_DEVICES_NUM; i++) {
702 deviceAQueies.push_back("device_" + std::to_string(i));
703 }
704 EXPECT_EQ(subManager.LocalSubscribeLimitCheck(deviceAQueies, queryCommonObj), -E_MAX_LIMITS);
705 }
706
707 /**
708 * @tc.name: subscribeSync001
709 * @tc.desc: test subscribe normal sync
710 * @tc.type: FUNC
711 * @tc.require: AR000FN6G9
712 * @tc.author: zhuwentao
713 */
714 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeSync001, TestSize.Level1)
715 {
716 /**
717 * @tc.steps: step1. InitSchemaDb
718 */
719 LOGI("============step 1============");
720 InitSubSchemaDb();
721 DBStatus status = OK;
722 std::vector<std::string> devices;
723 devices.push_back(g_deviceB->GetDeviceId());
724 Query query = Query::Select().EqualTo("$.field_name1", 1);
725 QuerySyncObject querySyncObj(query);
726
727 /**
728 * @tc.steps: step2. deviceB subscribe query to deviceA
729 */
730 LOGI("============step 2============");
731 g_deviceB->Subscribe(querySyncObj, true, 1);
732
733 /**
734 * @tc.steps: step3. deviceA put {key1, SCHEMA_VALUE1} and wait 1s
735 */
736 LOGI("============step 3============");
737 Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
738 Key key = {'1'};
739 status = g_schemaKvDelegatePtr->Put(key, value);
740 EXPECT_EQ(status, OK);
741 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
742 /**
743 * @tc.steps: step4. deviceB has {key11, SCHEMA_VALUE1}
744 */
745 LOGI("============step 4============");
746 VirtualDataItem item;
747 g_deviceB->GetData(key, item);
748 EXPECT_TRUE(item.value == value);
749
750 /**
751 * @tc.steps: step5. deviceB unsubscribe query to deviceA
752 */
753 g_deviceB->UnSubscribe(querySyncObj, true, 2);
754
755 /**
756 * @tc.steps: step5. deviceA put {key2, SCHEMA_VALUE1} and wait 1s
757 */
758 LOGI("============step 5============");
759 Value value2(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
760 Key key2 = {'2'};
761 status = g_schemaKvDelegatePtr->Put(key2, value2);
762 EXPECT_EQ(status, OK);
763 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
764 /**
765 * @tc.steps: step6. deviceB don't has {key2, SCHEMA_VALUE1}
766 */
767 LOGI("============step 6============");
768 VirtualDataItem item2;
769 EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
770 }
771
772 /**
773 * @tc.name: subscribeSync002
774 * @tc.desc: test subscribe sync over 32 devices,limit,orderBy
775 * @tc.type: FUNC
776 * @tc.require: AR000FN6G9
777 * @tc.author: zhuwentao
778 */
779 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeSync002, TestSize.Level1)
780 {
781 /**
782 * @tc.steps: step1. InitSchemaDb
783 */
784 LOGI("============step 1============");
785 InitSubSchemaDb();
786 std::vector<std::string> devices;
787 std::string device = "device_";
788 Query query = Query::Select().EqualTo("$.field_name1", 1);
789
790 /**
791 * @tc.steps: step2. deviceA subscribe query to 33 devices, and return overlimit
792 */
793 LOGI("============step 2============");
794 for (int i = 0; i < 33; i++) {
795 devices.push_back(device + std::to_string(i));
796 }
797 EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true) == OVER_MAX_LIMITS);
798
799 /**
800 * @tc.steps: step3. deviceA subscribe query with limit
801 */
802 LOGI("============step 3============");
803 devices.clear();
804 devices.push_back("device_B");
805 Query query2 = Query::Select().EqualTo("$.field_name1", 1).Limit(20, 0);
806 EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query2, true) == NOT_SUPPORT);
807
808 /**
809 * @tc.steps: step4. deviceA subscribe query with orderBy
810 */
811 LOGI("============step 4============");
812 Query query3 = Query::Select().EqualTo("$.field_name1", 1).OrderBy("$.field_name7");
813 EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query3, true) == NOT_SUPPORT);
814 }
815
816 /**
817 * @tc.name: subscribeSync003
818 * @tc.desc: test subscribe sync with inkeys query
819 * @tc.type: FUNC
820 * @tc.require: AR000GOHO7
821 * @tc.author: lidongwei
822 */
823 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeSync003, TestSize.Level1)
824 {
825 /**
826 * @tc.steps: step1. InitSchemaDb
827 */
828 LOGI("============step 1============");
829 InitSubSchemaDb();
830 std::vector<std::string> devices;
831 devices.push_back(g_deviceB->GetDeviceId());
832
833 /**
834 * @tc.steps: step2. deviceB subscribe inkeys(k2k4) query to deviceA
835 */
836 LOGI("============step 2============");
837 Query query = Query::Select().InKeys({KEY_2, KEY_4});
838 g_deviceB->Subscribe(QuerySyncObject(query), true, 1);
839
840 /**
841 * @tc.steps: step3. deviceA put k1-k5 and wait
842 */
843 LOGI("============step 3============");
844 EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
845 {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
846 {KEY_2, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
847 {KEY_3, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
848 {KEY_4, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
849 {KEY_5, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
850 }));
851 std::this_thread::sleep_for(std::chrono::milliseconds(500));
852
853 /**
854 * @tc.steps: step4. deviceB has k2k4, has no k1k3k5
855 */
856 LOGI("============step 4============");
857 VirtualDataItem item;
858 EXPECT_EQ(g_deviceB->GetData(KEY_2, item), E_OK);
859 EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
860 EXPECT_EQ(g_deviceB->GetData(KEY_4, item), E_OK);
861 EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
862 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
863 EXPECT_EQ(g_deviceB->GetData(KEY_3, item), -E_NOT_FOUND);
864 EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
865 }
866
867 /**
868 * @tc.name: subscribeSync004
869 * @tc.desc: test subscribe sync with inkeys query
870 * @tc.type: FUNC
871 * @tc.require: AR000GOHO7
872 * @tc.author: lidongwei
873 */
874 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeSync004, TestSize.Level1)
875 {
876 /**
877 * @tc.steps: step1. InitSchemaDb
878 */
879 LOGI("============step 1============");
880 InitSubSchemaDb();
881 std::vector<std::string> devices;
882 devices.push_back(g_deviceB->GetDeviceId());
883
884 /**
885 * @tc.steps: step2. deviceB subscribe inkeys(k3k5) and equal to query to deviceA
886 */
887 LOGI("============step 2============");
888 Query query = Query::Select().InKeys({KEY_3, KEY_5}).EqualTo("$.field_name3", 100); // 100 for test.
889 g_deviceB->Subscribe(QuerySyncObject(query), true, 2);
890
891 /**
892 * @tc.steps: step3. deviceA put k1v2,k3v2,k5v1 and wait
893 */
894 LOGI("============step 3============");
895 EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
896 {KEY_1, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end())},
897 {KEY_3, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end())},
898 {KEY_5, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
899 }));
900 std::this_thread::sleep_for(std::chrono::milliseconds(500));
901
902 /**
903 * @tc.steps: step4. deviceB has k3, has no k1k5
904 */
905 LOGI("============step 4============");
906 VirtualDataItem item;
907 EXPECT_EQ(g_deviceB->GetData(KEY_3, item), E_OK);
908 EXPECT_EQ(item.value, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end()));
909 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
910 EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
911 }
912
913 /**
914 * @tc.name: subscribeSync005
915 * @tc.desc: test subscribe sync with inkeys query
916 * @tc.type: FUNC
917 * @tc.require: AR000GOHO7
918 * @tc.author: lidongwei
919 */
920 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeSync005, TestSize.Level1)
921 {
922 /**
923 * @tc.steps: step1. InitSchemaDb
924 */
925 LOGI("============step 1============");
926 InitSubSchemaDb();
927 std::vector<std::string> devices;
928 devices.push_back(g_deviceB->GetDeviceId());
929
930 /**
931 * @tc.steps: step2. deviceB subscribe inkeys(k1, key6) and prefix key "k" query to deviceA
932 */
933 LOGI("============step 2============");
934 Key key6 { 'k', '6' };
935 Query query = Query::Select().InKeys({KEY_1, key6}).PrefixKey({ 'k' });
936 g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
937
938 /**
939 * @tc.steps: step3. deviceA put k1,key6 and wait
940 */
941 LOGI("============step 3============");
942 EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
943 {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
944 {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
945 }));
946 std::this_thread::sleep_for(std::chrono::milliseconds(500));
947
948 /**
949 * @tc.steps: step4. deviceB has key6, has no k1
950 */
951 LOGI("============step 4============");
952 VirtualDataItem item;
953 EXPECT_EQ(g_deviceB->GetData(key6, item), E_OK);
954 EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
955 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
956 }
957
958
959 /**
960 * @tc.name: subscribeSync006
961 * @tc.desc: test one device unsubscribe no effect other device
962 * @tc.type: FUNC
963 * @tc.require: AR000GOHO7
964 * @tc.author: zhangqiquan
965 */
966 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync006, TestSize.Level1)
967 {
968 /**
969 * @tc.steps: step1. InitSchemaDb
970 */
971 LOGI("============step 1============");
972 InitSubSchemaDb();
973 std::vector<std::string> devices;
974 devices.push_back(g_deviceB->GetDeviceId());
975 devices.push_back(g_deviceC->GetDeviceId());
976
977 /**
978 * @tc.steps: step2. deviceB unsubscribe inkeys(k1, key6) and prefix key "k" query to deviceA
979 */
980 LOGI("============step 2============");
981 Key key6 { 'k', '6' };
982 Query query = Query::Select().InKeys({KEY_1, key6}).PrefixKey({ 'k' });
983 g_deviceB->Online();
984 g_deviceC->Online();
985 g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
986 g_deviceC->Subscribe(QuerySyncObject(query), true, 3);
987
988 /**
989 * @tc.steps: step3. deviceC unsubscribe
990 */
991 LOGI("============step 3============");
992 g_deviceC->UnSubscribe(QuerySyncObject(query), true, 3);
993
994 /**
995 * @tc.steps: step4. deviceA put k1,key6 and wait
996 */
997 LOGI("============step 4============");
998 EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
999 {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1000 {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1001 }));
1002 std::this_thread::sleep_for(std::chrono::seconds(1));
1003
1004 /**
1005 * @tc.steps: step5. deviceB has key6, has no k1
1006 */
1007 LOGI("============step 5============");
1008 VirtualDataItem item;
1009 EXPECT_EQ(g_deviceB->GetData(key6, item), E_OK);
1010 EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
1011 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
1012 }
1013
1014 /**
1015 * @tc.name: subscribeSync007
1016 * @tc.desc: test subscribe query with order by write time
1017 * @tc.type: FUNC
1018 * @tc.require: AR000H5VLO
1019 * @tc.author: zhuwentao
1020 */
1021 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeSync007, TestSize.Level1)
1022 {
1023 /**
1024 * @tc.steps: step1. InitSchemaDb
1025 */
1026 LOGI("============step 1============");
1027 InitSubSchemaDb();
1028 std::vector<std::string> devices = {"DEVICE_B"};
1029
1030 /**
1031 * @tc.steps: step2. deviceA subscribe query with order by write time
1032 * * @tc.expected: step2. interface return not support
1033 */
1034 Query query = Query::Select().EqualTo("$.field_name1", 1).OrderByWriteTime(false);
1035 EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true) == NOT_SUPPORT);
1036 EXPECT_TRUE(g_schemaKvDelegatePtr->UnSubscribeRemoteQuery(devices, nullptr, query, true) == NOT_SUPPORT);
1037 }
1038