• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 #include <thread>
18 
19 #include "db_constant.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "kv_store_nb_delegate.h"
23 #include "kv_virtual_device.h"
24 #include "platform_specific.h"
25 #include "query.h"
26 #include "query_sync_object.h"
27 #include "single_ver_data_sync.h"
28 #include "single_ver_serialize_manager.h"
29 #include "subscribe_manager.h"
30 #include "sync_types.h"
31 
32 using namespace testing::ext;
33 using namespace DistributedDB;
34 using namespace DistributedDBUnitTest;
35 using namespace std;
36 
37 namespace {
38     string g_testDir;
39     const string SCHEMA_STORE_ID = "kv_store_sync_schema_test";
40     const std::string DEVICE_A = "deviceA";
41     const std::string DEVICE_B = "deviceB";
42     const std::string DEVICE_C = "deviceC";
43 
44     KvStoreDelegateManager g_schemaMgr(SCHEMA_APP_ID, USER_ID);
45     KvStoreConfig g_config;
46     DistributedDBToolsUnitTest g_tool;
47     DBStatus g_schemaKvDelegateStatus = INVALID_ARGS;
48     KvStoreNbDelegate* g_schemaKvDelegatePtr = nullptr;
49     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
50     KvVirtualDevice* g_deviceB = nullptr;
51     KvVirtualDevice* g_deviceC = nullptr;
52 
53     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
54     auto g_schemaKvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
55         placeholders::_1, placeholders::_2, std::ref(g_schemaKvDelegateStatus), std::ref(g_schemaKvDelegatePtr));
56     const string SCHEMA_STRING =
57     "{\"SCHEMA_VERSION\":\"1.0\","
58     "\"SCHEMA_MODE\":\"STRICT\","
59     "\"SCHEMA_DEFINE\":{"
60     "\"field_name1\":\"BOOL\","
61     "\"field_name2\":\"BOOL\","
62     "\"field_name3\":\"INTEGER, NOT NULL\","
63     "\"field_name4\":\"LONG, DEFAULT 100\","
64     "\"field_name5\":\"DOUBLE, NOT NULL, DEFAULT 3.14\","
65     "\"field_name6\":\"STRING, NOT NULL, DEFAULT '3.1415'\","
66     "\"field_name7\":\"LONG, DEFAULT 100\","
67     "\"field_name8\":\"LONG, DEFAULT 100\","
68     "\"field_name9\":\"LONG, DEFAULT 100\","
69     "\"field_name10\":\"LONG, DEFAULT 100\""
70     "},"
71     "\"SCHEMA_INDEXES\":[\"$.field_name1\", \"$.field_name2\"]}";
72 
73     const std::string SCHEMA_VALUE1 =
74     "{\"field_name1\":true,"
75     "\"field_name2\":false,"
76     "\"field_name3\":10,"
77     "\"field_name4\":20,"
78     "\"field_name5\":3.14,"
79     "\"field_name6\":\"3.1415\","
80     "\"field_name7\":100,"
81     "\"field_name8\":100,"
82     "\"field_name9\":100,"
83     "\"field_name10\":100}";
84 
85     const std::string SCHEMA_VALUE2 =
86     "{\"field_name1\":false,"
87     "\"field_name2\":true,"
88     "\"field_name3\":100,"
89     "\"field_name4\":200,"
90     "\"field_name5\":3.14,"
91     "\"field_name6\":\"3.1415\","
92     "\"field_name7\":100,"
93     "\"field_name8\":100,"
94     "\"field_name9\":100,"
95     "\"field_name10\":100}";
96 }
97 
98 class DistributedDBSingleVerP2PSubscribeSyncTest : public testing::Test {
99 public:
100     static void SetUpTestCase(void);
101     static void TearDownTestCase(void);
102     void SetUp();
103     void TearDown();
104 };
105 
SetUpTestCase(void)106 void DistributedDBSingleVerP2PSubscribeSyncTest::SetUpTestCase(void)
107 {
108     /**
109      * @tc.setup: Init datadir and Virtual Communicator.
110      */
111     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
112     g_config.dataDir = g_testDir;
113     g_schemaMgr.SetKvStoreConfig(g_config);
114 
115     string dir = g_testDir + "/single_ver";
116     DIR* dirTmp = opendir(dir.c_str());
117     if (dirTmp == nullptr) {
118         OS::MakeDBDirectory(dir);
119     } else {
120         closedir(dirTmp);
121     }
122 
123     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
124     ASSERT_TRUE(g_communicatorAggregator != nullptr);
125     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
126 }
127 
TearDownTestCase(void)128 void DistributedDBSingleVerP2PSubscribeSyncTest::TearDownTestCase(void)
129 {
130     /**
131      * @tc.teardown: Release virtual Communicator and clear data dir.
132      */
133     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
134         LOGE("rm test db files error!");
135     }
136     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
137 }
138 
SetUp(void)139 void DistributedDBSingleVerP2PSubscribeSyncTest::SetUp(void)
140 {
141     DistributedDBToolsUnitTest::PrintTestCaseInfo();
142     /**
143      * @tc.setup: create virtual device B and get a KvStoreNbDelegate as deviceA
144      */
145     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
146     ASSERT_TRUE(g_deviceB != nullptr);
147     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
148     ASSERT_TRUE(syncInterfaceB != nullptr);
149     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
150     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
151     ASSERT_TRUE(g_deviceC != nullptr);
152     VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
153     ASSERT_TRUE(syncInterfaceC != nullptr);
154     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
155 }
156 
TearDown(void)157 void DistributedDBSingleVerP2PSubscribeSyncTest::TearDown(void)
158 {
159     /**
160      * @tc.teardown: Release device A, B
161      */
162     if (g_schemaKvDelegatePtr != nullptr) {
163         ASSERT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
164         g_schemaKvDelegatePtr = nullptr;
165         DBStatus status = g_schemaMgr.DeleteKvStore(SCHEMA_STORE_ID);
166         LOGD("delete kv store status %d", status);
167         ASSERT_TRUE(status == OK);
168     }
169     if (g_deviceB != nullptr) {
170         delete g_deviceB;
171         g_deviceB = nullptr;
172     }
173     if (g_deviceC != nullptr) {
174         delete g_deviceC;
175         g_deviceC = nullptr;
176     }
177     PermissionCheckCallbackV2 nullCallback;
178     EXPECT_EQ(g_schemaMgr.SetPermissionCheckCallback(nullCallback), OK);
179 }
180 
InitSubSchemaDb()181 void InitSubSchemaDb()
182 {
183     g_config.dataDir = g_testDir;
184     g_schemaMgr.SetKvStoreConfig(g_config);
185     KvStoreNbDelegate::Option option;
186     option.schema = SCHEMA_STRING;
187     g_schemaMgr.GetKvStore(SCHEMA_STORE_ID, option, g_schemaKvDelegateCallback);
188     ASSERT_TRUE(g_schemaKvDelegateStatus == OK);
189     ASSERT_TRUE(g_schemaKvDelegatePtr != nullptr);
190 }
191 
CheckUnFinishedMap(uint32_t sizeA,uint32_t sizeB,std::vector<std::string> & deviceAQueies,std::vector<std::string> & deviceBQueies,SubscribeManager & subManager)192 void CheckUnFinishedMap(uint32_t sizeA, uint32_t sizeB, std::vector<std::string> &deviceAQueies,
193     std::vector<std::string> &deviceBQueies, SubscribeManager &subManager)
194 {
195     std::map<std::string, std::vector<QuerySyncObject>> allSyncQueries;
196     subManager.GetAllUnFinishSubQueries(allSyncQueries);
197     ASSERT_TRUE(allSyncQueries[DEVICE_A].size() == sizeA);
198     ASSERT_TRUE(allSyncQueries[DEVICE_B].size() == sizeB);
199     for (auto &item : allSyncQueries[DEVICE_A]) {
200         std::string queryId = item.GetIdentify();
201         ASSERT_TRUE(std::find(deviceAQueies.begin(), deviceAQueies.end(), queryId) != deviceAQueies.end());
202     }
203     for (auto &item : allSyncQueries[DEVICE_B]) {
204         std::string queryId = item.GetIdentify();
205         ASSERT_TRUE(std::find(deviceBQueies.begin(), deviceBQueies.end(), queryId) != deviceBQueies.end());
206     }
207 }
208 
InitLocalSubscribeMap(QuerySyncObject & queryCommonObj,std::map<std::string,QuerySyncObject> & queryMap,std::vector<std::string> & deviceAQueies,std::vector<std::string> & deviceBQueies,SubscribeManager & subManager)209 void InitLocalSubscribeMap(QuerySyncObject &queryCommonObj, std::map<std::string, QuerySyncObject> &queryMap,
210     std::vector<std::string> &deviceAQueies, std::vector<std::string> &deviceBQueies, SubscribeManager &subManager)
211 {
212     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
213     ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
214     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_B, queryCommonObj) == E_OK);
215     ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryCommonObj) == E_OK);
216     queryMap[queryCommonObj.GetIdentify()] = queryCommonObj;
217     deviceAQueies.push_back(queryCommonObj.GetIdentify());
218     deviceBQueies.push_back(queryCommonObj.GetIdentify());
219     for (int i = 0; i < 3; i++) { // 3 subscribe
220         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
221         deviceAQueies.push_back(querySyncObj.GetIdentify());
222         queryMap[querySyncObj.GetIdentify()] = querySyncObj;
223         ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_A, querySyncObj) == E_OK);
224         ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_A, querySyncObj) == E_OK);
225     }
226     for (int i = 0; i < 1; i++) {
227         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('b' + i)}));
228         deviceBQueies.push_back(querySyncObj.GetIdentify());
229         queryMap[querySyncObj.GetIdentify()] = querySyncObj;
230         ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_B, querySyncObj) == E_OK);
231         ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_B, querySyncObj) == E_OK);
232     }
233 }
234 /**
235  * @tc.name: SubscribeRequestTest001
236  * @tc.desc: test Serialize/DoSerialize SubscribeRequest
237  * @tc.type: FUNC
238  * @tc.require: AR000FN6G9
239  * @tc.author: zhuwentao
240  */
241 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeRequestTest001, TestSize.Level1)
242 {
243     /**
244      * @tc.steps: step1. prepare a SubscribeRequest.
245      */
246     auto packet = new (std::nothrow) SubscribeRequest;
247     ASSERT_TRUE(packet != nullptr);
248     packet->SetPacketHead(100, SOFTWARE_VERSION_CURRENT, SUBSCRIBE_QUERY_CMD, 1);
249     Query query = Query::Select().EqualTo("$.field_name1", 1);
250     QuerySyncObject syncQuery(query);
251     packet->SetQuery(syncQuery);
252 
253     /**
254      * @tc.steps: step2. put the SubscribeRequest Packet into a message.
255      */
256     Message msg;
257     msg.SetExternalObject(packet);
258     msg.SetMessageId(CONTROL_SYNC_MESSAGE);
259     msg.SetMessageType(TYPE_REQUEST);
260 
261     /**
262      * @tc.steps: step3. Serialization the message to a buffer.
263      */
264     int len = SingleVerSerializeManager::CalculateLen(&msg);
265     LOGE("test leng = %d", len);
266     uint8_t *buffer = new (nothrow) uint8_t[len];
267     ASSERT_TRUE(buffer != nullptr);
268     ASSERT_EQ(SingleVerSerializeManager::Serialization(buffer, len, &msg), E_OK);
269 
270     /**
271      * @tc.steps: step4. DeSerialization the buffer to a message.
272      */
273     Message outMsg;
274     outMsg.SetMessageId(CONTROL_SYNC_MESSAGE);
275     outMsg.SetMessageType(TYPE_REQUEST);
276     ASSERT_EQ(SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg), E_OK);
277 
278     /**
279      * @tc.steps: step5. checkout the outMsg.
280      * @tc.expected: step5. outMsg equal the the in msg
281      */
282     auto outPacket = outMsg.GetObject<SubscribeRequest>();
283     EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
284     EXPECT_EQ(outPacket->GetSendCode(), 100);
285     EXPECT_EQ(outPacket->GetcontrolCmdType(), SUBSCRIBE_QUERY_CMD);
286     EXPECT_EQ(outPacket->GetFlag(), 1u);
287     EXPECT_EQ(outPacket->GetQuery().GetIdentify(), syncQuery.GetIdentify());
288     delete[] buffer;
289 }
290 
291 /**
292  * @tc.name: ControlAckTest001
293  * @tc.desc: test Serialize/DoSerialize ControlAckPacket
294  * @tc.type: FUNC
295  * @tc.require: AR000FN6G9
296  * @tc.author: zhuwentao
297  */
298 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, ControlAckTest001, TestSize.Level1)
299 {
300     /**
301      * @tc.steps: step1. prepare a ControlAckPacket.
302      */
303     ControlAckPacket packet;
304     packet.SetPacketHead(-E_NOT_SUPPORT, SOFTWARE_VERSION_CURRENT, SUBSCRIBE_QUERY_CMD, 1);
305 
306     /**
307      * @tc.steps: step2. put the QuerySyncAckPacket into a message.
308      */
309     Message msg;
310     msg.SetCopiedObject(packet);
311     msg.SetMessageId(CONTROL_SYNC_MESSAGE);
312     msg.SetMessageType(TYPE_RESPONSE);
313 
314     /**
315      * @tc.steps: step3. Serialization the message to a buffer.
316      */
317     int len = SingleVerSerializeManager::CalculateLen(&msg);
318     LOGE("test leng = %d", len);
319     uint8_t *buffer = new (nothrow) uint8_t[len];
320     ASSERT_TRUE(buffer != nullptr);
321     int errCode = SingleVerSerializeManager::Serialization(buffer, len, &msg);
322     ASSERT_EQ(errCode, E_OK);
323 
324     /**
325      * @tc.steps: step4. DeSerialization the buffer to a message.
326      */
327     Message outMsg;
328     outMsg.SetMessageId(CONTROL_SYNC_MESSAGE);
329     outMsg.SetMessageType(TYPE_RESPONSE);
330     errCode = SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg);
331     ASSERT_EQ(errCode, E_OK);
332 
333     /**
334      * @tc.steps: step5. checkout the outMsg.
335      * @tc.expected: step5. outMsg equal the the in msg
336      */
337     auto outPacket = outMsg.GetObject<ControlAckPacket>();
338     EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
339     EXPECT_EQ(outPacket->GetRecvCode(), -E_NOT_SUPPORT);
340     EXPECT_EQ(outPacket->GetcontrolCmdType(), SUBSCRIBE_QUERY_CMD);
341     EXPECT_EQ(outPacket->GetFlag(), 1u);
342     delete[] buffer;
343 }
344 
345 /**
346  * @tc.name: subscribeManager001
347  * @tc.desc: test subscribe class subscribe local function with one device
348  * @tc.type: FUNC
349  * @tc.require: AR000FN6G9
350  * @tc.author: zhuwentao
351  */
352 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeManager001, TestSize.Level1)
353 {
354     SubscribeManager subManager;
355     std::string device = "device_A";
356     /**
357      * @tc.steps: step1. test one device limit four subscribe queries in local map
358      */
359     LOGI("============step 1============");
360     for (int i = 0; i < 4; i++) {
361         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
362         ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj) == E_OK);
363         ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj) == E_OK);
364     }
365     std::vector<QuerySyncObject> subscribeQueries;
366     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
367     ASSERT_TRUE(subscribeQueries.size() == 4);
368     subscribeQueries.clear();
369     QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 4)}));
370     int errCode = subManager.ReserveLocalSubscribeQuery(device, querySyncObj1);
371     ASSERT_TRUE(errCode != E_OK);
372     /**
373      * @tc.steps: step2. allow to subscribe existed query
374      */
375     LOGI("============step 2============");
376     QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 3)}));
377     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
378     ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
379     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
380     ASSERT_TRUE(subscribeQueries.size() == 4);
381     subscribeQueries.clear();
382     /**
383      * @tc.steps: step3. unsubscribe no existed queries
384      */
385     LOGI("============step 3============");
386     subManager.RemoveLocalSubscribeQuery(device, querySyncObj1);
387     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
388     ASSERT_TRUE(subscribeQueries.size() == 4);
389     subscribeQueries.clear();
390     /**
391      * @tc.steps: step4. unsubscribe queries
392      */
393     LOGI("============step 4============");
394     for (int i = 0; i < 4; i++) {
395         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
396         subManager.RemoveLocalSubscribeQuery(device, querySyncObj);
397     }
398     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
399     ASSERT_TRUE(subscribeQueries.size() == 0);
400 
401     /**
402      * @tc.steps: step5. reserve twice while subscribe queries
403      */
404     LOGI("============step 5============");
405     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
406     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
407     ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
408     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
409     ASSERT_TRUE(subscribeQueries.size() == 1);
410     subscribeQueries.clear();
411     subManager.RemoveLocalSubscribeQuery(device, querySyncObj2);
412     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
413     ASSERT_TRUE(subscribeQueries.size() == 0);
414 }
415 
416 /**
417  * @tc.name: subscribeManager002
418  * @tc.desc: test subscribe class subscribe remote function with one device
419  * @tc.type: FUNC
420  * @tc.require: AR000FN6G9
421  * @tc.author: zhuwentao
422  */
423 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeManager002, TestSize.Level1)
424 {
425     SubscribeManager subManager;
426     std::string device = "device_A";
427     /**
428      * @tc.steps: step1. test one device limit four subscribe queries in remote map
429      */
430     LOGI("============step 1============");
431     for (int i = 0; i < 4; i++) {
432         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
433         ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj) == E_OK);
434         ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device, querySyncObj) == E_OK);
435     }
436     QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 4)}));
437     ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj1) != E_OK);
438     std::vector<std::string> subscribeQueryId;
439     subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
440 ASSERT_TRUE(subscribeQueryId.size() == 4);
441     subscribeQueryId.clear();
442     /**
443      * @tc.steps: step2. allow to subscribe existed query
444      */
445     LOGI("============step 2============");
446     QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 3)}));
447     ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj2) == E_OK);
448     ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device, querySyncObj2) == E_OK);
449     subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
450     ASSERT_TRUE(subscribeQueryId.size() == 4);
451     subscribeQueryId.clear();
452     /**
453      * @tc.steps: step3. unsubscribe no existed queries
454      */
455     LOGI("============step 3============");
456     subManager.RemoveRemoteSubscribeQuery(device, querySyncObj1);
457     subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
458     ASSERT_TRUE(subscribeQueryId.size() == 4);
459     subscribeQueryId.clear();
460     /**
461      * @tc.steps: step4. unsubscribe queries
462      */
463     LOGI("============step 4============");
464     for (int i = 0; i < 4; i++) {
465         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
466         subManager.RemoveRemoteSubscribeQuery(device, querySyncObj);
467     }
468     subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
469     ASSERT_TRUE(subscribeQueryId.size() == 0);
470 }
471 
472 /**
473  * @tc.name: subscribeManager003
474  * @tc.desc: test subscribe class subscribe remote function with multi device
475  * @tc.type: FUNC
476  * @tc.require: AR000FN6G9
477  * @tc.author: zhuwentao
478  */
479 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeManager003, TestSize.Level1)
480 {
481     SubscribeManager subManager;
482     std::string device = "device_";
483     std::vector<QuerySyncObject> subscribeQueries;
484     /**
485      * @tc.steps: step1. test mutil device limit 32 devices in remote map and check each device has one subscribe
486      */
487     LOGI("============step 1============");
488     QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 1)}));
489     for (int i = 0; i < 32; i++) {
490         ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
491         ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
492     }
493     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(33), querySyncObj) != E_OK);
494     for (int i = 0; i < 32; i++) {
495         subManager.GetLocalSubscribeQueries(device + std::to_string(i), subscribeQueries);
496         ASSERT_TRUE(subscribeQueries.size() == 1);
497         subscribeQueries.clear();
498     }
499     /**
500      * @tc.steps: step2. clear remote subscribe query map and check each device has no subscribe
501      */
502     LOGI("============step 2============");
503     for (int i = 0; i < 32; i++) {
504         subManager.ClearLocalSubscribeQuery(device + std::to_string(i));
505         subManager.GetLocalSubscribeQueries(device + std::to_string(i), subscribeQueries);
506         ASSERT_TRUE(subscribeQueries.size() == 0);
507         subscribeQueries.clear();
508     }
509     /**
510      * @tc.steps: step3. test mutil device limit 8 queries in db and check each device has one subscribe
511      */
512     LOGI("============step 3============");
513     for (int i = 0; i < 8; i++) {
514         QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
515         ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
516         ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
517     }
518     QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 8)}));
519     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(8), querySyncObj1) != E_OK);
520 }
521 
522 /**
523  * @tc.name: subscribeManager004
524  * @tc.desc: test subscribe class subscribe remote function with multi device
525  * @tc.type: FUNC
526  * @tc.require: AR000FN6G9
527  * @tc.author: zhuwentao
528  */
529 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeManager004, TestSize.Level1)
530 {
531     SubscribeManager subManager;
532     std::string device = "device_";
533     std::vector<std::string> subscribeQueryId;
534     /**
535      * @tc.steps: step1. test mutil device limit 32 devices in remote map and check each device has one subscribe
536      */
537     LOGI("============step 1============");
538     QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 1)}));
539     for (int i = 0; i < 32; i++) {
540         ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
541         ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
542     }
543     ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(33), querySyncObj) != E_OK);
544     for (int i = 0; i < 32; i++) {
545         subManager.GetRemoteSubscribeQueryIds(device + std::to_string(i), subscribeQueryId);
546         ASSERT_TRUE(subscribeQueryId.size() == 1);
547         subscribeQueryId.clear();
548     }
549     /**
550      * @tc.steps: step2. clear remote subscribe query map and check each device has no subscribe
551      */
552     LOGI("============step 2============");
553     for (int i = 0; i < 32; i++) {
554         subManager.ClearRemoteSubscribeQuery(device + std::to_string(i));
555         subManager.GetRemoteSubscribeQueryIds(device + std::to_string(i), subscribeQueryId);
556         ASSERT_TRUE(subscribeQueryId.size() == 0);
557         subscribeQueryId.clear();
558     }
559     subManager.ClearRemoteSubscribeQuery(device);
560     /**
561      * @tc.steps: step3. test mutil device limit 8 queries in db and check each device has one subscribe
562      */
563     LOGI("============step 3============");
564     for (int i = 0; i < 8; i++) {
565         QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
566         ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
567         ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
568     }
569     QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 8)}));
570     ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(8), querySyncObj1) != E_OK);
571 }
572 
573 /**
574  * @tc.name: subscribeManager005
575  * @tc.desc: test subscribe class subscribe remote function with put into unfinished map
576  * @tc.type: FUNC
577  * @tc.require: AR000FN6G9
578  * @tc.author: zhuwentao
579  */
580 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeManager005, TestSize.Level1)
581 {
582     SubscribeManager subManager;
583     std::vector<QuerySyncObject> subscribeQueries;
584     std::map<std::string, QuerySyncObject> queryMap;
585     std::vector<std::string> deviceAQueies;
586     std::vector<std::string> deviceBQueies;
587     QuerySyncObject queryCommonObj(Query::Select().PrefixKey({'a'}));
588     /**
589      * @tc.steps: step1. test one devices has 4 subscribes and another has 2 in local map, put into unfinished map
590      */
591     LOGI("============step 1============");
592     InitLocalSubscribeMap(queryCommonObj, queryMap, deviceAQueies, deviceBQueies, subManager);
593     /**
594      * @tc.steps: step2. check all device unFinished subscribe queries and put into unfinished map
595      */
596     LOGI("============step 2============");
597     subManager.GetLocalSubscribeQueries(DEVICE_A, subscribeQueries);
598     ASSERT_TRUE(subscribeQueries.size() == 4);
599     subManager.PutLocalUnFiniedSubQueries(DEVICE_A, subscribeQueries);
600     subscribeQueries.clear();
601     subManager.GetLocalSubscribeQueries(DEVICE_B, subscribeQueries);
602     ASSERT_TRUE(subscribeQueries.size() == 2);
603     subManager.PutLocalUnFiniedSubQueries(DEVICE_B, subscribeQueries);
604     subscribeQueries.clear();
605     /**
606      * @tc.steps: step3. get all device unFinished subscribe queries and check
607      */
608     LOGI("============step 3============");
609     CheckUnFinishedMap(4, 2, deviceAQueies, deviceBQueies, subManager);
610     /**
611      * @tc.steps: step4. active some subscribe queries
612      */
613     LOGI("============step 4============");
614     subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
615     subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryMap[deviceAQueies[3]]);
616     subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryMap[deviceBQueies[1]]);
617     deviceAQueies.erase(deviceAQueies.begin() + 3);
618     deviceAQueies.erase(deviceAQueies.begin());
619     queryMap.erase(queryMap[deviceBQueies[1]].GetIdentify());
620     deviceBQueies.erase(deviceBQueies.begin() + 1);
621     /**
622      * @tc.steps: step5. get all device unFinished subscribe queries and check
623      */
624     LOGI("============step 5============");
625     CheckUnFinishedMap(2, 1, deviceAQueies, deviceBQueies, subManager);
626     /**
627      * @tc.steps: step6. remove left subscribe queries
628      */
629     LOGI("============step 6============");
630     for (int i = 0; i < 2; i++) {
631         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
632         subManager.RemoveLocalSubscribeQuery(DEVICE_A, querySyncObj);
633     }
634     subManager.RemoveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
635     subManager.RemoveLocalSubscribeQuery(DEVICE_B, queryCommonObj);
636     /**
637      * @tc.steps: step7. get all device unFinished subscribe queries and check
638      */
639     LOGI("============step 7============");
640     CheckUnFinishedMap(0, 0, deviceAQueies, deviceBQueies, subManager);
641 }
642 
643 /**
644  * @tc.name: subscribeManager006
645  * @tc.desc: test exception branch of subscribe manager
646  * @tc.type: FUNC
647  * @tc.require:
648  * @tc.author: zhangshijie
649  */
650 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeManager006, TestSize.Level1)
651 {
652     /**
653      * @tc.steps: step1. active a query sync object which is not in local subscribe map
654      * @tc.expected:step1 return -E_INTERNAL_ERROR
655      */
656     SubscribeManager subManager;
657     QuerySyncObject queryCommonObj(Query::Select().PrefixKey({'a'}));
658     EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj), -E_INTERNAL_ERROR);
659     subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryCommonObj);
660     subManager.RemoveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
661     std::vector<QuerySyncObject> subscribeQueries;
662     subManager.PutLocalUnFiniedSubQueries(DEVICE_A, subscribeQueries);
663     std::map<std::string, std::vector<QuerySyncObject>> allSyncQueries;
664     subManager.GetAllUnFinishSubQueries(allSyncQueries);
665 
666     /**
667      * @tc.steps: step2. call IsLastRemoteContainSubscribe with a device not in remote subscribe map
668      * @tc.expected: step2 return false
669      */
670     std::string queryId = "queryId";
671     EXPECT_EQ(subManager.IsLastRemoteContainSubscribe(DEVICE_A, queryId), false);
672 
673     /**
674      * @tc.steps: step3. active local subscribe with a device which is not in local subscribe map and
675      * a query sync object which is in local subscribe map
676      * @tc.expected: step3 return -E_INTERNAL_ERROR
677      */
678     std::vector<std::string> deviceAQueies;
679     std::vector<std::string> deviceBQueies;
680     std::map<std::string, QuerySyncObject> queryMap;
681     InitLocalSubscribeMap(queryCommonObj, queryMap, deviceAQueies, deviceBQueies, subManager);
682     ASSERT_TRUE(queryMap.size() > 0);
683     std::string devNotExists = "device_not_exists";
684     EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(devNotExists, queryMap.begin()->second), -E_INTERNAL_ERROR);
685     QuerySyncObject queryObj(Query::Select().PrefixKey({'b'}));
686     EXPECT_EQ(subManager.ReserveLocalSubscribeQuery("test_dev", queryObj), E_OK);
687     subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryObj);
688 
689     EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryObj), -E_INTERNAL_ERROR);
690     subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryCommonObj);
691     ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
692     ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
693     EXPECT_EQ(subManager.IsLastRemoteContainSubscribe(DEVICE_A, queryId), false);
694     deviceAQueies.push_back(DEVICE_A);
695     EXPECT_EQ(subManager.LocalSubscribeLimitCheck(deviceAQueies, queryCommonObj), E_OK);
696 
697     /**
698      * @tc.steps: step4. add MAX_DEVICES_NUM device, then call LocalSubscribeLimitCheck
699      * @tc.expected: step4 return -E_MAX_LIMITS
700      */
701     for (size_t i = 0 ; i < MAX_DEVICES_NUM; i++) {
702         deviceAQueies.push_back("device_" + std::to_string(i));
703     }
704     EXPECT_EQ(subManager.LocalSubscribeLimitCheck(deviceAQueies, queryCommonObj), -E_MAX_LIMITS);
705 }
706 
707 /**
708  * @tc.name: subscribeSync001
709  * @tc.desc: test subscribe normal sync
710  * @tc.type: FUNC
711  * @tc.require: AR000FN6G9
712  * @tc.author: zhuwentao
713  */
714 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeSync001, TestSize.Level1)
715 {
716     /**
717      * @tc.steps: step1. InitSchemaDb
718     */
719     LOGI("============step 1============");
720     InitSubSchemaDb();
721     DBStatus status = OK;
722     std::vector<std::string> devices;
723     devices.push_back(g_deviceB->GetDeviceId());
724     Query query = Query::Select().EqualTo("$.field_name1", 1);
725     QuerySyncObject querySyncObj(query);
726 
727     /**
728      * @tc.steps: step2. deviceB subscribe query to deviceA
729     */
730     LOGI("============step 2============");
731     g_deviceB->Subscribe(querySyncObj, true, 1);
732 
733     /**
734      * @tc.steps: step3. deviceA put {key1, SCHEMA_VALUE1} and wait 1s
735     */
736     LOGI("============step 3============");
737     Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
738     Key key = {'1'};
739     status = g_schemaKvDelegatePtr->Put(key, value);
740     EXPECT_EQ(status, OK);
741     std::this_thread::sleep_for(std::chrono::milliseconds(1000));
742     /**
743      * @tc.steps: step4. deviceB has {key11, SCHEMA_VALUE1}
744     */
745     LOGI("============step 4============");
746     VirtualDataItem item;
747     g_deviceB->GetData(key, item);
748     EXPECT_TRUE(item.value == value);
749 
750     /**
751      * @tc.steps: step5. deviceB unsubscribe query to deviceA
752     */
753     g_deviceB->UnSubscribe(querySyncObj, true, 2);
754 
755     /**
756      * @tc.steps: step5. deviceA put {key2, SCHEMA_VALUE1} and wait 1s
757     */
758     LOGI("============step 5============");
759     Value value2(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
760     Key key2 = {'2'};
761     status = g_schemaKvDelegatePtr->Put(key2, value2);
762     EXPECT_EQ(status, OK);
763     std::this_thread::sleep_for(std::chrono::milliseconds(1000));
764     /**
765      * @tc.steps: step6. deviceB don't has {key2, SCHEMA_VALUE1}
766     */
767     LOGI("============step 6============");
768     VirtualDataItem item2;
769     EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
770 }
771 
772 /**
773  * @tc.name: subscribeSync002
774  * @tc.desc: test subscribe sync over 32 devices,limit,orderBy
775  * @tc.type: FUNC
776  * @tc.require: AR000FN6G9
777  * @tc.author: zhuwentao
778  */
779 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeSync002, TestSize.Level1)
780 {
781     /**
782      * @tc.steps: step1. InitSchemaDb
783     */
784     LOGI("============step 1============");
785     InitSubSchemaDb();
786     std::vector<std::string> devices;
787     std::string device = "device_";
788     Query query = Query::Select().EqualTo("$.field_name1", 1);
789 
790     /**
791      * @tc.steps: step2. deviceA subscribe query to 33 devices, and return overlimit
792     */
793     LOGI("============step 2============");
794     for (int i = 0; i < 33; i++) {
795         devices.push_back(device + std::to_string(i));
796     }
797     EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true) == OVER_MAX_LIMITS);
798 
799     /**
800      * @tc.steps: step3. deviceA subscribe query with limit
801     */
802     LOGI("============step 3============");
803     devices.clear();
804     devices.push_back("device_B");
805     Query query2 = Query::Select().EqualTo("$.field_name1", 1).Limit(20, 0);
806     EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query2, true) == NOT_SUPPORT);
807 
808     /**
809      * @tc.steps: step4. deviceA subscribe query with orderBy
810     */
811     LOGI("============step 4============");
812     Query query3 = Query::Select().EqualTo("$.field_name1", 1).OrderBy("$.field_name7");
813     EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query3, true) == NOT_SUPPORT);
814 }
815 
816 /**
817  * @tc.name: subscribeSync003
818  * @tc.desc: test subscribe sync with inkeys query
819  * @tc.type: FUNC
820  * @tc.require: AR000GOHO7
821  * @tc.author: lidongwei
822  */
823 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeSync003, TestSize.Level1)
824 {
825     /**
826      * @tc.steps: step1. InitSchemaDb
827      */
828     LOGI("============step 1============");
829     InitSubSchemaDb();
830     std::vector<std::string> devices;
831     devices.push_back(g_deviceB->GetDeviceId());
832 
833     /**
834      * @tc.steps: step2. deviceB subscribe inkeys(k2k4) query to deviceA
835      */
836     LOGI("============step 2============");
837     Query query = Query::Select().InKeys({KEY_2, KEY_4});
838     g_deviceB->Subscribe(QuerySyncObject(query), true, 1);
839 
840     /**
841      * @tc.steps: step3. deviceA put k1-k5 and wait
842      */
843     LOGI("============step 3============");
844     EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
845         {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
846         {KEY_2, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
847         {KEY_3, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
848         {KEY_4, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
849         {KEY_5, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
850     }));
851     std::this_thread::sleep_for(std::chrono::milliseconds(500));
852 
853     /**
854      * @tc.steps: step4. deviceB has k2k4, has no k1k3k5
855      */
856     LOGI("============step 4============");
857     VirtualDataItem item;
858     EXPECT_EQ(g_deviceB->GetData(KEY_2, item), E_OK);
859     EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
860     EXPECT_EQ(g_deviceB->GetData(KEY_4, item), E_OK);
861     EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
862     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
863     EXPECT_EQ(g_deviceB->GetData(KEY_3, item), -E_NOT_FOUND);
864     EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
865 }
866 
867 /**
868  * @tc.name: subscribeSync004
869  * @tc.desc: test subscribe sync with inkeys query
870  * @tc.type: FUNC
871  * @tc.require: AR000GOHO7
872  * @tc.author: lidongwei
873  */
874 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeSync004, TestSize.Level1)
875 {
876     /**
877      * @tc.steps: step1. InitSchemaDb
878      */
879     LOGI("============step 1============");
880     InitSubSchemaDb();
881     std::vector<std::string> devices;
882     devices.push_back(g_deviceB->GetDeviceId());
883 
884     /**
885      * @tc.steps: step2. deviceB subscribe inkeys(k3k5) and equal to query to deviceA
886      */
887     LOGI("============step 2============");
888     Query query = Query::Select().InKeys({KEY_3, KEY_5}).EqualTo("$.field_name3", 100); // 100 for test.
889     g_deviceB->Subscribe(QuerySyncObject(query), true, 2);
890 
891     /**
892      * @tc.steps: step3. deviceA put k1v2,k3v2,k5v1 and wait
893      */
894     LOGI("============step 3============");
895     EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
896         {KEY_1, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end())},
897         {KEY_3, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end())},
898         {KEY_5, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
899     }));
900     std::this_thread::sleep_for(std::chrono::milliseconds(500));
901 
902     /**
903      * @tc.steps: step4. deviceB has k3, has no k1k5
904      */
905     LOGI("============step 4============");
906     VirtualDataItem item;
907     EXPECT_EQ(g_deviceB->GetData(KEY_3, item), E_OK);
908     EXPECT_EQ(item.value, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end()));
909     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
910     EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
911 }
912 
913 /**
914  * @tc.name: subscribeSync005
915  * @tc.desc: test subscribe sync with inkeys query
916  * @tc.type: FUNC
917  * @tc.require: AR000GOHO7
918  * @tc.author: lidongwei
919  */
920 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeSync005, TestSize.Level1)
921 {
922     /**
923      * @tc.steps: step1. InitSchemaDb
924      */
925     LOGI("============step 1============");
926     InitSubSchemaDb();
927     std::vector<std::string> devices;
928     devices.push_back(g_deviceB->GetDeviceId());
929 
930     /**
931      * @tc.steps: step2. deviceB subscribe inkeys(k1, key6) and prefix key "k" query to deviceA
932      */
933     LOGI("============step 2============");
934     Key key6 { 'k', '6' };
935     Query query = Query::Select().InKeys({KEY_1, key6}).PrefixKey({ 'k' });
936     g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
937 
938     /**
939      * @tc.steps: step3. deviceA put k1,key6 and wait
940      */
941     LOGI("============step 3============");
942     EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
943         {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
944         {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
945     }));
946     std::this_thread::sleep_for(std::chrono::milliseconds(500));
947 
948     /**
949      * @tc.steps: step4. deviceB has key6, has no k1
950      */
951     LOGI("============step 4============");
952     VirtualDataItem item;
953     EXPECT_EQ(g_deviceB->GetData(key6, item), E_OK);
954     EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
955     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
956 }
957 
958 
959 /**
960  * @tc.name: subscribeSync006
961  * @tc.desc: test one device unsubscribe no effect other device
962  * @tc.type: FUNC
963  * @tc.require: AR000GOHO7
964  * @tc.author: zhangqiquan
965  */
966 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync006, TestSize.Level1)
967 {
968     /**
969      * @tc.steps: step1. InitSchemaDb
970      */
971     LOGI("============step 1============");
972     InitSubSchemaDb();
973     std::vector<std::string> devices;
974     devices.push_back(g_deviceB->GetDeviceId());
975     devices.push_back(g_deviceC->GetDeviceId());
976 
977     /**
978      * @tc.steps: step2. deviceB unsubscribe inkeys(k1, key6) and prefix key "k" query to deviceA
979      */
980     LOGI("============step 2============");
981     Key key6 { 'k', '6' };
982     Query query = Query::Select().InKeys({KEY_1, key6}).PrefixKey({ 'k' });
983     g_deviceB->Online();
984     g_deviceC->Online();
985     g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
986     g_deviceC->Subscribe(QuerySyncObject(query), true, 3);
987 
988     /**
989      * @tc.steps: step3. deviceC unsubscribe
990      */
991     LOGI("============step 3============");
992     g_deviceC->UnSubscribe(QuerySyncObject(query), true, 3);
993 
994     /**
995      * @tc.steps: step4. deviceA put k1,key6 and wait
996      */
997     LOGI("============step 4============");
998     EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
999         {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1000         {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1001     }));
1002     std::this_thread::sleep_for(std::chrono::seconds(1));
1003 
1004     /**
1005      * @tc.steps: step5. deviceB has key6, has no k1
1006      */
1007     LOGI("============step 5============");
1008     VirtualDataItem item;
1009     EXPECT_EQ(g_deviceB->GetData(key6, item), E_OK);
1010     EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
1011     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
1012 }
1013 
1014 /**
1015  * @tc.name: subscribeSync007
1016  * @tc.desc: test subscribe query with order by write time
1017  * @tc.type: FUNC
1018  * @tc.require: AR000H5VLO
1019  * @tc.author: zhuwentao
1020  */
1021 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, subscribeSync007, TestSize.Level1)
1022 {
1023     /**
1024      * @tc.steps: step1. InitSchemaDb
1025     */
1026     LOGI("============step 1============");
1027     InitSubSchemaDb();
1028     std::vector<std::string> devices = {"DEVICE_B"};
1029 
1030     /**
1031      * @tc.steps: step2. deviceA subscribe query with order by write time
1032      * * @tc.expected: step2. interface return not support
1033     */
1034     Query query = Query::Select().EqualTo("$.field_name1", 1).OrderByWriteTime(false);
1035     EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true) == NOT_SUPPORT);
1036     EXPECT_TRUE(g_schemaKvDelegatePtr->UnSubscribeRemoteQuery(devices, nullptr, query, true) == NOT_SUPPORT);
1037 }
1038