• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 #include <thread>
18 
19 #include "db_constant.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "kv_store_nb_delegate.h"
23 #include "kv_virtual_device.h"
24 #include "platform_specific.h"
25 #include "query.h"
26 #include "query_sync_object.h"
27 #include "runtime_config.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_serialize_manager.h"
30 #include "subscribe_manager.h"
31 #include "subscribe_recorder.h"
32 #include "sync_types.h"
33 
34 using namespace testing::ext;
35 using namespace DistributedDB;
36 using namespace DistributedDBUnitTest;
37 using namespace std;
38 
39 namespace {
40     string g_testDir;
41     const string SCHEMA_STORE_ID = "kv_store_sync_schema_test";
42     const std::string DEVICE_A = "deviceA";
43     const std::string DEVICE_B = "deviceB";
44     const std::string DEVICE_C = "deviceC";
45 
46     KvStoreDelegateManager g_schemaMgr(SCHEMA_APP_ID, USER_ID);
47     KvStoreConfig g_config;
48     DistributedDBToolsUnitTest g_tool;
49     DBStatus g_schemaKvDelegateStatus = INVALID_ARGS;
50     KvStoreNbDelegate* g_schemaKvDelegatePtr = nullptr;
51     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
52     KvVirtualDevice* g_deviceB = nullptr;
53     KvVirtualDevice* g_deviceC = nullptr;
54 
55     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
56     auto g_schemaKvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
57         placeholders::_1, placeholders::_2, std::ref(g_schemaKvDelegateStatus), std::ref(g_schemaKvDelegatePtr));
58     const string SCHEMA_STRING =
59     "{\"SCHEMA_VERSION\":\"1.0\","
60     "\"SCHEMA_MODE\":\"STRICT\","
61     "\"SCHEMA_DEFINE\":{"
62     "\"field_name1\":\"BOOL\","
63     "\"field_name2\":\"BOOL\","
64     "\"field_name3\":\"INTEGER, NOT NULL\","
65     "\"field_name4\":\"LONG, DEFAULT 100\","
66     "\"field_name5\":\"DOUBLE, NOT NULL, DEFAULT 3.14\","
67     "\"field_name6\":\"STRING, NOT NULL, DEFAULT '3.1415'\","
68     "\"field_name7\":\"LONG, DEFAULT 100\","
69     "\"field_name8\":\"LONG, DEFAULT 100\","
70     "\"field_name9\":\"LONG, DEFAULT 100\","
71     "\"field_name10\":\"LONG, DEFAULT 100\""
72     "},"
73     "\"SCHEMA_INDEXES\":[\"$.field_name1\", \"$.field_name2\"]}";
74 
75     const std::string SCHEMA_VALUE1 =
76     "{\"field_name1\":true,"
77     "\"field_name2\":false,"
78     "\"field_name3\":10,"
79     "\"field_name4\":20,"
80     "\"field_name5\":3.14,"
81     "\"field_name6\":\"3.1415\","
82     "\"field_name7\":100,"
83     "\"field_name8\":100,"
84     "\"field_name9\":100,"
85     "\"field_name10\":100}";
86 
87     const std::string SCHEMA_VALUE2 =
88     "{\"field_name1\":false,"
89     "\"field_name2\":true,"
90     "\"field_name3\":100,"
91     "\"field_name4\":200,"
92     "\"field_name5\":3.14,"
93     "\"field_name6\":\"3.1415\","
94     "\"field_name7\":100,"
95     "\"field_name8\":100,"
96     "\"field_name9\":100,"
97     "\"field_name10\":100}";
98 
99 class DistributedDBSingleVerP2PSubscribeSyncTest : public testing::Test {
100 public:
101     static void SetUpTestCase(void);
102     static void TearDownTestCase(void);
103     void SetUp();
104     void TearDown();
105 protected:
106     static void WaitUntilNotify(KvVirtualDevice &virtualDevice);
107 };
108 
SetUpTestCase(void)109 void DistributedDBSingleVerP2PSubscribeSyncTest::SetUpTestCase(void)
110 {
111     /**
112      * @tc.setup: Init datadir and Virtual Communicator.
113      */
114     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
115     g_config.dataDir = g_testDir;
116     g_schemaMgr.SetKvStoreConfig(g_config);
117 
118     string dir = g_testDir + "/single_ver";
119     DIR* dirTmp = opendir(dir.c_str());
120     if (dirTmp == nullptr) {
121         OS::MakeDBDirectory(dir);
122     } else {
123         closedir(dirTmp);
124     }
125 
126     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
127     ASSERT_TRUE(g_communicatorAggregator != nullptr);
128     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
129 }
130 
TearDownTestCase(void)131 void DistributedDBSingleVerP2PSubscribeSyncTest::TearDownTestCase(void)
132 {
133     /**
134      * @tc.teardown: Release virtual Communicator and clear data dir.
135      */
136     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
137         LOGE("rm test db files error!");
138     }
139     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
140 }
141 
SetUp(void)142 void DistributedDBSingleVerP2PSubscribeSyncTest::SetUp(void)
143 {
144     DistributedDBToolsUnitTest::PrintTestCaseInfo();
145     /**
146      * @tc.setup: create virtual device B and get a KvStoreNbDelegate as deviceA
147      */
148     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
149     ASSERT_TRUE(g_deviceB != nullptr);
150     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
151     ASSERT_TRUE(syncInterfaceB != nullptr);
152     syncInterfaceB->SetSchemaInfo(SCHEMA_STRING);
153     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
154     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
155     ASSERT_TRUE(g_deviceC != nullptr);
156     VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
157     ASSERT_TRUE(syncInterfaceC != nullptr);
158     syncInterfaceC->SetSchemaInfo(SCHEMA_STRING);
159     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
160 }
161 
TearDown(void)162 void DistributedDBSingleVerP2PSubscribeSyncTest::TearDown(void)
163 {
164     /**
165      * @tc.teardown: Release device A, B
166      */
167     if (g_schemaKvDelegatePtr != nullptr) {
168         ASSERT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
169         g_schemaKvDelegatePtr = nullptr;
170         DBStatus status = g_schemaMgr.DeleteKvStore(SCHEMA_STORE_ID);
171         LOGD("delete kv store status %d", status);
172         ASSERT_TRUE(status == OK);
173     }
174     if (g_deviceB != nullptr) {
175         delete g_deviceB;
176         g_deviceB = nullptr;
177     }
178     if (g_deviceC != nullptr) {
179         delete g_deviceC;
180         g_deviceC = nullptr;
181     }
182     PermissionCheckCallbackV2 nullCallback;
183     EXPECT_EQ(g_schemaMgr.SetPermissionCheckCallback(nullCallback), OK);
184 }
185 
WaitUntilNotify(KvVirtualDevice & virtualDevice)186 void DistributedDBSingleVerP2PSubscribeSyncTest::WaitUntilNotify(KvVirtualDevice &virtualDevice)
187 {
188     bool notify = false;
189     std::condition_variable cv;
190     std::mutex notifyMutex;
191     virtualDevice.SetPushNotifier([&notify, &cv, &notifyMutex](const std::string &) {
192         {
193             std::lock_guard<std::mutex> autoLock(notifyMutex);
194             notify = true;
195         }
196         cv.notify_all();
197     });
198     {
199         LOGI("Begin wait notify");
200         std::unique_lock<std::mutex> uniqueLock(notifyMutex);
201         (void)cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&notify]() {
202             return notify;
203         });
204         LOGI("End wait notify");
205     }
206     virtualDevice.SetPushNotifier(nullptr);
207     std::this_thread::sleep_for(std::chrono::seconds(1));
208 }
209 
InitSubSchemaDb()210 void InitSubSchemaDb()
211 {
212     g_config.dataDir = g_testDir;
213     g_schemaMgr.SetKvStoreConfig(g_config);
214     KvStoreNbDelegate::Option option;
215     option.schema = SCHEMA_STRING;
216     g_schemaMgr.GetKvStore(SCHEMA_STORE_ID, option, g_schemaKvDelegateCallback);
217     ASSERT_TRUE(g_schemaKvDelegateStatus == OK);
218     ASSERT_TRUE(g_schemaKvDelegatePtr != nullptr);
219 }
220 
CheckUnFinishedMap(uint32_t sizeA,uint32_t sizeB,std::vector<std::string> & deviceAQueies,std::vector<std::string> & deviceBQueies,SubscribeManager & subManager)221 void CheckUnFinishedMap(uint32_t sizeA, uint32_t sizeB, std::vector<std::string> &deviceAQueies,
222     std::vector<std::string> &deviceBQueies, SubscribeManager &subManager)
223 {
224     std::map<std::string, std::vector<QuerySyncObject>> allSyncQueries;
225     subManager.GetAllUnFinishSubQueries(allSyncQueries);
226     ASSERT_TRUE(allSyncQueries[DEVICE_A].size() == sizeA);
227     ASSERT_TRUE(allSyncQueries[DEVICE_B].size() == sizeB);
228     for (auto &item : allSyncQueries[DEVICE_A]) {
229         std::string queryId = item.GetIdentify();
230         ASSERT_TRUE(std::find(deviceAQueies.begin(), deviceAQueies.end(), queryId) != deviceAQueies.end());
231     }
232     for (auto &item : allSyncQueries[DEVICE_B]) {
233         std::string queryId = item.GetIdentify();
234         ASSERT_TRUE(std::find(deviceBQueies.begin(), deviceBQueies.end(), queryId) != deviceBQueies.end());
235     }
236 }
237 
InitLocalSubscribeMap(QuerySyncObject & queryCommonObj,std::map<std::string,QuerySyncObject> & queryMap,std::vector<std::string> & deviceAQueies,std::vector<std::string> & deviceBQueies,SubscribeManager & subManager)238 void InitLocalSubscribeMap(QuerySyncObject &queryCommonObj, std::map<std::string, QuerySyncObject> &queryMap,
239     std::vector<std::string> &deviceAQueies, std::vector<std::string> &deviceBQueies, SubscribeManager &subManager)
240 {
241     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
242     ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
243     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_B, queryCommonObj) == E_OK);
244     ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryCommonObj) == E_OK);
245     queryMap[queryCommonObj.GetIdentify()] = queryCommonObj;
246     deviceAQueies.push_back(queryCommonObj.GetIdentify());
247     deviceBQueies.push_back(queryCommonObj.GetIdentify());
248     for (int i = 0; i < 3; i++) { // 3 subscribe
249         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
250         deviceAQueies.push_back(querySyncObj.GetIdentify());
251         queryMap[querySyncObj.GetIdentify()] = querySyncObj;
252         ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_A, querySyncObj) == E_OK);
253         ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_A, querySyncObj) == E_OK);
254     }
255     for (int i = 0; i < 1; i++) {
256         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('b' + i)}));
257         deviceBQueies.push_back(querySyncObj.GetIdentify());
258         queryMap[querySyncObj.GetIdentify()] = querySyncObj;
259         ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_B, querySyncObj) == E_OK);
260         ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_B, querySyncObj) == E_OK);
261     }
262 }
263 /**
264  * @tc.name: SubscribeRequestTest001
265  * @tc.desc: test Serialize/DoSerialize SubscribeRequest
266  * @tc.type: FUNC
267  * @tc.require:
268  * @tc.author: zhuwentao
269  */
270 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeRequestTest001, TestSize.Level1)
271 {
272     /**
273      * @tc.steps: step1. prepare a SubscribeRequest.
274      */
275     auto packet = new (std::nothrow) SubscribeRequest;
276     ASSERT_TRUE(packet != nullptr);
277     packet->SetPacketHead(100, SOFTWARE_VERSION_CURRENT, SUBSCRIBE_QUERY_CMD, 1);
278     Query query = Query::Select().EqualTo("$.field_name1", 1);
279     QuerySyncObject syncQuery(query);
280     packet->SetQuery(syncQuery);
281 
282     /**
283      * @tc.steps: step2. put the SubscribeRequest Packet into a message.
284      */
285     Message msg;
286     msg.SetExternalObject(packet);
287     msg.SetMessageId(CONTROL_SYNC_MESSAGE);
288     msg.SetMessageType(TYPE_REQUEST);
289 
290     /**
291      * @tc.steps: step3. Serialization the message to a buffer.
292      */
293     int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
294     LOGE("test leng = %d", len);
295     uint8_t *buffer = new (nothrow) uint8_t[len];
296     ASSERT_TRUE(buffer != nullptr);
297     ASSERT_EQ(SingleVerSerializeManager::Serialization(buffer, len, &msg), E_OK);
298 
299     /**
300      * @tc.steps: step4. DeSerialization the buffer to a message.
301      */
302     Message outMsg;
303     outMsg.SetMessageId(CONTROL_SYNC_MESSAGE);
304     outMsg.SetMessageType(TYPE_REQUEST);
305     ASSERT_EQ(SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg), E_OK);
306 
307     /**
308      * @tc.steps: step5. checkout the outMsg.
309      * @tc.expected: step5. outMsg equal the the in msg
310      */
311     auto outPacket = outMsg.GetObject<SubscribeRequest>();
312     EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
313     EXPECT_EQ(outPacket->GetSendCode(), 100);
314     EXPECT_EQ(outPacket->GetcontrolCmdType(), SUBSCRIBE_QUERY_CMD);
315     EXPECT_EQ(outPacket->GetFlag(), 1u);
316     EXPECT_EQ(outPacket->GetQuery().GetIdentify(), syncQuery.GetIdentify());
317     delete[] buffer;
318 }
319 
320 /**
321  * @tc.name: ControlAckTest001
322  * @tc.desc: test Serialize/DoSerialize ControlAckPacket
323  * @tc.type: FUNC
324  * @tc.require:
325  * @tc.author: zhuwentao
326  */
327 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, ControlAckTest001, TestSize.Level1)
328 {
329     /**
330      * @tc.steps: step1. prepare a ControlAckPacket.
331      */
332     ControlAckPacket packet;
333     packet.SetPacketHead(-E_NOT_SUPPORT, SOFTWARE_VERSION_CURRENT, SUBSCRIBE_QUERY_CMD, 1);
334 
335     /**
336      * @tc.steps: step2. put the QuerySyncAckPacket into a message.
337      */
338     Message msg;
339     msg.SetCopiedObject(packet);
340     msg.SetMessageId(CONTROL_SYNC_MESSAGE);
341     msg.SetMessageType(TYPE_RESPONSE);
342 
343     /**
344      * @tc.steps: step3. Serialization the message to a buffer.
345      */
346     int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
347     LOGE("test leng = %d", len);
348     uint8_t *buffer = new (nothrow) uint8_t[len];
349     ASSERT_TRUE(buffer != nullptr);
350     int errCode = SingleVerSerializeManager::Serialization(buffer, len, &msg);
351     ASSERT_EQ(errCode, E_OK);
352 
353     /**
354      * @tc.steps: step4. DeSerialization the buffer to a message.
355      */
356     Message outMsg;
357     outMsg.SetMessageId(CONTROL_SYNC_MESSAGE);
358     outMsg.SetMessageType(TYPE_RESPONSE);
359     errCode = SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg);
360     ASSERT_EQ(errCode, E_OK);
361 
362     /**
363      * @tc.steps: step5. checkout the outMsg.
364      * @tc.expected: step5. outMsg equal the the in msg
365      */
366     auto outPacket = outMsg.GetObject<ControlAckPacket>();
367     EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
368     EXPECT_EQ(outPacket->GetRecvCode(), -E_NOT_SUPPORT);
369     EXPECT_EQ(outPacket->GetcontrolCmdType(), SUBSCRIBE_QUERY_CMD);
370     EXPECT_EQ(outPacket->GetFlag(), 1u);
371     delete[] buffer;
372 }
373 
374 /**
375  * @tc.name: subscribeManager001
376  * @tc.desc: test subscribe class subscribe local function with one device
377  * @tc.type: FUNC
378  * @tc.require:
379  * @tc.author: zhuwentao
380  */
381 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager001, TestSize.Level1)
382 {
383     SubscribeManager subManager;
384     std::string device = "device_A";
385     /**
386      * @tc.steps: step1. test one device limit four subscribe queries in local map
387      */
388     LOGI("============step 1============");
389     for (int i = 0; i < 4; i++) {
390         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
391         ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj) == E_OK);
392         ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj) == E_OK);
393     }
394     std::vector<QuerySyncObject> subscribeQueries;
395     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
396     ASSERT_TRUE(subscribeQueries.size() == 4);
397     subscribeQueries.clear();
398     QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 4)}));
399     int errCode = subManager.ReserveLocalSubscribeQuery(device, querySyncObj1);
400     ASSERT_TRUE(errCode != E_OK);
401     /**
402      * @tc.steps: step2. allow to subscribe existed query
403      */
404     LOGI("============step 2============");
405     QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 3)}));
406     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
407     ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
408     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
409     ASSERT_TRUE(subscribeQueries.size() == 4);
410     subscribeQueries.clear();
411     /**
412      * @tc.steps: step3. unsubscribe no existed queries
413      */
414     LOGI("============step 3============");
415     subManager.RemoveLocalSubscribeQuery(device, querySyncObj1);
416     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
417     ASSERT_TRUE(subscribeQueries.size() == 4);
418     subscribeQueries.clear();
419     /**
420      * @tc.steps: step4. unsubscribe queries
421      */
422     LOGI("============step 4============");
423     for (int i = 0; i < 4; i++) {
424         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
425         subManager.RemoveLocalSubscribeQuery(device, querySyncObj);
426     }
427     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
428     ASSERT_TRUE(subscribeQueries.size() == 0);
429 
430     /**
431      * @tc.steps: step5. reserve twice while subscribe queries
432      */
433     LOGI("============step 5============");
434     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
435     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
436     ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
437     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
438     ASSERT_TRUE(subscribeQueries.size() == 1);
439     subscribeQueries.clear();
440     subManager.RemoveLocalSubscribeQuery(device, querySyncObj2);
441     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
442     ASSERT_TRUE(subscribeQueries.size() == 0);
443 }
444 
445 /**
446  * @tc.name: subscribeManager002
447  * @tc.desc: test subscribe class subscribe remote function with one device
448  * @tc.type: FUNC
449  * @tc.require:
450  * @tc.author: zhuwentao
451  */
452 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager002, TestSize.Level1)
453 {
454     SubscribeManager subManager;
455     std::string device = "device_A";
456     /**
457      * @tc.steps: step1. test one device limit four subscribe queries in remote map
458      */
459     LOGI("============step 1============");
460     for (int i = 0; i < 4; i++) {
461         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
462         ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj) == E_OK);
463         ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device, querySyncObj) == E_OK);
464     }
465     QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 4)}));
466     ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj1) != E_OK);
467     std::vector<std::string> subscribeQueryId;
468     subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
469 ASSERT_TRUE(subscribeQueryId.size() == 4);
470     subscribeQueryId.clear();
471     /**
472      * @tc.steps: step2. allow to subscribe existed query
473      */
474     LOGI("============step 2============");
475     QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 3)}));
476     ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj2) == E_OK);
477     ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device, querySyncObj2) == E_OK);
478     subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
479     ASSERT_TRUE(subscribeQueryId.size() == 4);
480     subscribeQueryId.clear();
481     /**
482      * @tc.steps: step3. unsubscribe no existed queries
483      */
484     LOGI("============step 3============");
485     subManager.RemoveRemoteSubscribeQuery(device, querySyncObj1);
486     subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
487     ASSERT_TRUE(subscribeQueryId.size() == 4);
488     subscribeQueryId.clear();
489     /**
490      * @tc.steps: step4. unsubscribe queries
491      */
492     LOGI("============step 4============");
493     for (int i = 0; i < 4; i++) {
494         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
495         subManager.RemoveRemoteSubscribeQuery(device, querySyncObj);
496     }
497     subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
498     ASSERT_TRUE(subscribeQueryId.size() == 0);
499 }
500 
501 /**
502  * @tc.name: subscribeManager003
503  * @tc.desc: test subscribe class subscribe remote function with multi device
504  * @tc.type: FUNC
505  * @tc.require:
506  * @tc.author: zhuwentao
507  */
508 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager003, TestSize.Level1)
509 {
510     SubscribeManager subManager;
511     std::string device = "device_";
512     std::vector<QuerySyncObject> subscribeQueries;
513     /**
514      * @tc.steps: step1. test mutil device limit 32 devices in remote map and check each device has one subscribe
515      */
516     LOGI("============step 1============");
517     QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 1)}));
518     for (int i = 0; i < 32; i++) {
519         ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
520         ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
521     }
522     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(33), querySyncObj) != E_OK);
523     for (int i = 0; i < 32; i++) {
524         subManager.GetLocalSubscribeQueries(device + std::to_string(i), subscribeQueries);
525         ASSERT_TRUE(subscribeQueries.size() == 1);
526         subscribeQueries.clear();
527     }
528     /**
529      * @tc.steps: step2. clear remote subscribe query map and check each device has no subscribe
530      */
531     LOGI("============step 2============");
532     for (int i = 0; i < 32; i++) {
533         subManager.ClearLocalSubscribeQuery(device + std::to_string(i));
534         subManager.GetLocalSubscribeQueries(device + std::to_string(i), subscribeQueries);
535         ASSERT_TRUE(subscribeQueries.size() == 0);
536         subscribeQueries.clear();
537     }
538     /**
539      * @tc.steps: step3. test mutil device limit 8 queries in db and check each device has one subscribe
540      */
541     LOGI("============step 3============");
542     for (int i = 0; i < 8; i++) {
543         QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
544         ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
545         ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
546     }
547     QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 8)}));
548     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(8), querySyncObj1) != E_OK);
549 }
550 
551 /**
552  * @tc.name: subscribeManager004
553  * @tc.desc: test subscribe class subscribe remote function with multi device
554  * @tc.type: FUNC
555  * @tc.require:
556  * @tc.author: zhuwentao
557  */
558 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager004, TestSize.Level1)
559 {
560     SubscribeManager subManager;
561     std::string device = "device_";
562     std::vector<std::string> subscribeQueryId;
563     /**
564      * @tc.steps: step1. test mutil device limit 32 devices in remote map and check each device has one subscribe
565      */
566     LOGI("============step 1============");
567     QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 1)}));
568     for (int i = 0; i < 32; i++) {
569         ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
570         ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
571     }
572     ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(33), querySyncObj) != E_OK);
573     for (int i = 0; i < 32; i++) {
574         subManager.GetRemoteSubscribeQueryIds(device + std::to_string(i), subscribeQueryId);
575         ASSERT_TRUE(subscribeQueryId.size() == 1);
576         subscribeQueryId.clear();
577     }
578     /**
579      * @tc.steps: step2. clear remote subscribe query map and check each device has no subscribe
580      */
581     LOGI("============step 2============");
582     for (int i = 0; i < 32; i++) {
583         subManager.ClearRemoteSubscribeQuery(device + std::to_string(i));
584         subManager.GetRemoteSubscribeQueryIds(device + std::to_string(i), subscribeQueryId);
585         ASSERT_TRUE(subscribeQueryId.size() == 0);
586         subscribeQueryId.clear();
587     }
588     subManager.ClearRemoteSubscribeQuery(device);
589     /**
590      * @tc.steps: step3. test mutil device limit 8 queries in db and check each device has one subscribe
591      */
592     LOGI("============step 3============");
593     for (int i = 0; i < 8; i++) {
594         QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
595         ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
596         ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
597     }
598     QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 8)}));
599     ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(8), querySyncObj1) != E_OK);
600 }
601 
602 /**
603  * @tc.name: subscribeManager005
604  * @tc.desc: test subscribe class subscribe remote function with put into unfinished map
605  * @tc.type: FUNC
606  * @tc.require:
607  * @tc.author: zhuwentao
608  */
609 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager005, TestSize.Level1)
610 {
611     SubscribeManager subManager;
612     std::vector<QuerySyncObject> subscribeQueries;
613     std::map<std::string, QuerySyncObject> queryMap;
614     std::vector<std::string> deviceAQueies;
615     std::vector<std::string> deviceBQueies;
616     QuerySyncObject queryCommonObj(Query::Select().PrefixKey({'a'}));
617     /**
618      * @tc.steps: step1. test one devices has 4 subscribes and another has 2 in local map, put into unfinished map
619      */
620     LOGI("============step 1============");
621     InitLocalSubscribeMap(queryCommonObj, queryMap, deviceAQueies, deviceBQueies, subManager);
622     /**
623      * @tc.steps: step2. check all device unFinished subscribe queries and put into unfinished map
624      */
625     LOGI("============step 2============");
626     subManager.GetLocalSubscribeQueries(DEVICE_A, subscribeQueries);
627     ASSERT_TRUE(subscribeQueries.size() == 4);
628     subManager.PutLocalUnFinishedSubQueries(DEVICE_A, subscribeQueries);
629     subscribeQueries.clear();
630     subManager.GetLocalSubscribeQueries(DEVICE_B, subscribeQueries);
631     ASSERT_TRUE(subscribeQueries.size() == 2);
632     subManager.PutLocalUnFinishedSubQueries(DEVICE_B, subscribeQueries);
633     subscribeQueries.clear();
634     /**
635      * @tc.steps: step3. get all device unFinished subscribe queries and check
636      */
637     LOGI("============step 3============");
638     CheckUnFinishedMap(4, 2, deviceAQueies, deviceBQueies, subManager);
639     /**
640      * @tc.steps: step4. active some subscribe queries
641      */
642     LOGI("============step 4============");
643     subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
644     subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryMap[deviceAQueies[3]]);
645     subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryMap[deviceBQueies[1]]);
646     deviceAQueies.erase(deviceAQueies.begin() + 3);
647     deviceAQueies.erase(deviceAQueies.begin());
648     queryMap.erase(queryMap[deviceBQueies[1]].GetIdentify());
649     deviceBQueies.erase(deviceBQueies.begin() + 1);
650     /**
651      * @tc.steps: step5. get all device unFinished subscribe queries and check
652      */
653     LOGI("============step 5============");
654     CheckUnFinishedMap(2, 1, deviceAQueies, deviceBQueies, subManager);
655     /**
656      * @tc.steps: step6. remove left subscribe queries
657      */
658     LOGI("============step 6============");
659     for (int i = 0; i < 2; i++) {
660         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
661         subManager.RemoveLocalSubscribeQuery(DEVICE_A, querySyncObj);
662     }
663     subManager.RemoveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
664     subManager.RemoveLocalSubscribeQuery(DEVICE_B, queryCommonObj);
665     /**
666      * @tc.steps: step7. get all device unFinished subscribe queries and check
667      */
668     LOGI("============step 7============");
669     CheckUnFinishedMap(0, 0, deviceAQueies, deviceBQueies, subManager);
670 }
671 
672 /**
673  * @tc.name: subscribeManager006
674  * @tc.desc: test exception branch of subscribe manager
675  * @tc.type: FUNC
676  * @tc.require:
677  * @tc.author: zhangshijie
678  */
679 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager006, TestSize.Level1)
680 {
681     /**
682      * @tc.steps: step1. active a query sync object which is not in local subscribe map
683      * @tc.expected:step1 return -E_INTERNAL_ERROR
684      */
685     SubscribeManager subManager;
686     QuerySyncObject queryCommonObj(Query::Select().PrefixKey({'a'}));
687     EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj), -E_INTERNAL_ERROR);
688     subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryCommonObj);
689     subManager.RemoveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
690     std::vector<QuerySyncObject> subscribeQueries;
691     subManager.PutLocalUnFinishedSubQueries(DEVICE_A, subscribeQueries);
692     std::map<std::string, std::vector<QuerySyncObject>> allSyncQueries;
693     subManager.GetAllUnFinishSubQueries(allSyncQueries);
694 
695     /**
696      * @tc.steps: step2. call IsLastRemoteContainSubscribe with a device not in remote subscribe map
697      * @tc.expected: step2 return false
698      */
699     std::string queryId = "queryId";
700     EXPECT_EQ(subManager.IsLastRemoteContainSubscribe(DEVICE_A, queryId), false);
701 
702     /**
703      * @tc.steps: step3. active local subscribe with a device which is not in local subscribe map and
704      * a query sync object which is in local subscribe map
705      * @tc.expected: step3 return -E_INTERNAL_ERROR
706      */
707     std::vector<std::string> deviceAQueies;
708     std::vector<std::string> deviceBQueies;
709     std::map<std::string, QuerySyncObject> queryMap;
710     InitLocalSubscribeMap(queryCommonObj, queryMap, deviceAQueies, deviceBQueies, subManager);
711     ASSERT_TRUE(queryMap.size() > 0);
712     std::string devNotExists = "device_not_exists";
713     EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(devNotExists, queryMap.begin()->second), -E_INTERNAL_ERROR);
714     QuerySyncObject queryObj(Query::Select().PrefixKey({'b'}));
715     EXPECT_EQ(subManager.ReserveLocalSubscribeQuery("test_dev", queryObj), E_OK);
716     subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryObj);
717 
718     EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryObj), -E_INTERNAL_ERROR);
719     subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryCommonObj);
720     ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
721     ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
722     EXPECT_EQ(subManager.IsLastRemoteContainSubscribe(DEVICE_A, queryId), false);
723     deviceAQueies.push_back(DEVICE_A);
724     EXPECT_EQ(subManager.LocalSubscribeLimitCheck(deviceAQueies, queryCommonObj), E_OK);
725 
726     /**
727      * @tc.steps: step4. add MAX_DEVICES_NUM device, then call LocalSubscribeLimitCheck
728      * @tc.expected: step4 return -E_MAX_LIMITS
729      */
730     for (size_t i = 0 ; i < MAX_DEVICES_NUM; i++) {
731         deviceAQueies.push_back("device_" + std::to_string(i));
732     }
733     EXPECT_EQ(subManager.LocalSubscribeLimitCheck(deviceAQueies, queryCommonObj), -E_MAX_LIMITS);
734 }
735 
736 /**
737  * @tc.name: subscribeSync001
738  * @tc.desc: test subscribe normal sync
739  * @tc.type: FUNC
740  * @tc.require:
741  * @tc.author: zhuwentao
742  */
743 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync001, TestSize.Level1)
744 {
745     /**
746      * @tc.steps: step1. InitSchemaDb
747     */
748     LOGI("============step 1============");
749     InitSubSchemaDb();
750     DBStatus status = OK;
751     std::vector<std::string> devices;
752     devices.push_back(g_deviceB->GetDeviceId());
753     Query query = Query::Select().EqualTo("$.field_name1", 1);
754     QuerySyncObject querySyncObj(query);
755 
756     /**
757      * @tc.steps: step2. deviceB subscribe query to deviceA
758     */
759     LOGI("============step 2============");
760     g_deviceB->Subscribe(querySyncObj, true, 1);
761 
762     /**
763      * @tc.steps: step3. deviceA put {key1, SCHEMA_VALUE1} and wait 1s
764     */
765     LOGI("============step 3============");
766     Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
767     Key key = {'1'};
768     status = g_schemaKvDelegatePtr->Put(key, value);
769     EXPECT_EQ(status, OK);
770     WaitUntilNotify(*g_deviceB);
771     /**
772      * @tc.steps: step4. deviceB has {key11, SCHEMA_VALUE1}
773     */
774     LOGI("============step 4============");
775     VirtualDataItem item;
776     g_deviceB->GetData(key, item);
777     EXPECT_TRUE(item.value == value);
778 
779     /**
780      * @tc.steps: step5. deviceB unsubscribe query to deviceA
781     */
782     g_deviceB->UnSubscribe(querySyncObj, true, 2);
783 
784     /**
785      * @tc.steps: step5. deviceA put {key2, SCHEMA_VALUE1} and wait 1s
786     */
787     LOGI("============step 5============");
788     Value value2(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
789     Key key2 = {'2'};
790     status = g_schemaKvDelegatePtr->Put(key2, value2);
791     EXPECT_EQ(status, OK);
792     WaitUntilNotify(*g_deviceB);
793     /**
794      * @tc.steps: step6. deviceB don't has {key2, SCHEMA_VALUE1}
795     */
796     LOGI("============step 6============");
797     VirtualDataItem item2;
798     EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
799 }
800 
801 /**
802  * @tc.name: subscribeSync002
803  * @tc.desc: test subscribe sync over 32 devices,limit,orderBy
804  * @tc.type: FUNC
805  * @tc.require:
806  * @tc.author: zhuwentao
807  */
808 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync002, TestSize.Level1)
809 {
810     /**
811      * @tc.steps: step1. InitSchemaDb
812     */
813     LOGI("============step 1============");
814     InitSubSchemaDb();
815     std::vector<std::string> devices;
816     std::string device = "device_";
817     Query query = Query::Select().EqualTo("$.field_name1", 1);
818 
819     /**
820      * @tc.steps: step2. deviceA subscribe query to 33 devices, and return overlimit
821     */
822     LOGI("============step 2============");
823     for (int i = 0; i < 33; i++) {
824         devices.push_back(device + std::to_string(i));
825     }
826     EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true) == OVER_MAX_LIMITS);
827 
828     /**
829      * @tc.steps: step3. deviceA subscribe query with limit
830     */
831     LOGI("============step 3============");
832     devices.clear();
833     devices.push_back("device_B");
834     Query query2 = Query::Select().EqualTo("$.field_name1", 1).Limit(20, 0);
835     EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query2, true) == NOT_SUPPORT);
836 
837     /**
838      * @tc.steps: step4. deviceA subscribe query with orderBy
839     */
840     LOGI("============step 4============");
841     Query query3 = Query::Select().EqualTo("$.field_name1", 1).OrderBy("$.field_name7");
842     EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query3, true) == NOT_SUPPORT);
843 }
844 
845 /**
846  * @tc.name: subscribeSync003
847  * @tc.desc: test subscribe sync with inkeys query
848  * @tc.type: FUNC
849  * @tc.require:
850  * @tc.author: lidongwei
851  */
852 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync003, TestSize.Level1)
853 {
854     /**
855      * @tc.steps: step1. InitSchemaDb
856      */
857     LOGI("============step 1============");
858     InitSubSchemaDb();
859     std::vector<std::string> devices;
860     devices.push_back(g_deviceB->GetDeviceId());
861     g_deviceB->Online();
862 
863     /**
864      * @tc.steps: step2. deviceB subscribe inkeys(k2k4) query to deviceA
865      */
866     LOGI("============step 2============");
867     Query query = Query::Select().InKeys({KEY_2, KEY_4});
868     g_deviceB->Subscribe(QuerySyncObject(query), true, 1);
869 
870     /**
871      * @tc.steps: step3. deviceA put k1-k5 and wait
872      */
873     LOGI("============step 3============");
874     EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
875         {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
876         {KEY_2, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
877         {KEY_3, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
878         {KEY_4, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
879         {KEY_5, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
880     }));
881     WaitUntilNotify(*g_deviceB);
882 
883     /**
884      * @tc.steps: step4. deviceB has k2k4, has no k1k3k5
885      */
886     LOGI("============step 4============");
887     VirtualDataItem item;
888     EXPECT_EQ(g_deviceB->GetData(KEY_2, item), E_OK);
889     EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
890     EXPECT_EQ(g_deviceB->GetData(KEY_4, item), E_OK);
891     EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
892     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
893     EXPECT_EQ(g_deviceB->GetData(KEY_3, item), -E_NOT_FOUND);
894     EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
895 }
896 
897 /**
898  * @tc.name: subscribeSync004
899  * @tc.desc: test subscribe sync with inkeys query
900  * @tc.type: FUNC
901  * @tc.require:
902  * @tc.author: lidongwei
903  */
904 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync004, TestSize.Level1)
905 {
906     /**
907      * @tc.steps: step1. InitSchemaDb
908      */
909     LOGI("============step 1============");
910     InitSubSchemaDb();
911     std::vector<std::string> devices;
912     devices.push_back(g_deviceB->GetDeviceId());
913 
914     /**
915      * @tc.steps: step2. deviceB subscribe inkeys(k3k5) and equal to query to deviceA
916      */
917     LOGI("============step 2============");
918     Query query = Query::Select().InKeys({KEY_3, KEY_5}).EqualTo("$.field_name3", 100); // 100 for test.
919     g_deviceB->Subscribe(QuerySyncObject(query), true, 2);
920 
921     /**
922      * @tc.steps: step3. deviceA put k1v2,k3v2,k5v1 and wait
923      */
924     LOGI("============step 3============");
925     EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
926         {KEY_1, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end())},
927         {KEY_3, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end())},
928         {KEY_5, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
929     }));
930     WaitUntilNotify(*g_deviceB);
931 
932     /**
933      * @tc.steps: step4. deviceB has k3, has no k1k5
934      */
935     LOGI("============step 4============");
936     VirtualDataItem item;
937     EXPECT_EQ(g_deviceB->GetData(KEY_3, item), E_OK);
938     EXPECT_EQ(item.value, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end()));
939     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
940     EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
941 }
942 
943 /**
944  * @tc.name: subscribeSync005
945  * @tc.desc: test subscribe sync with inkeys query
946  * @tc.type: FUNC
947  * @tc.require:
948  * @tc.author: lidongwei
949  */
950 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync005, TestSize.Level1)
951 {
952     /**
953      * @tc.steps: step1. InitSchemaDb
954      */
955     LOGI("============step 1============");
956     InitSubSchemaDb();
957     std::vector<std::string> devices;
958     devices.push_back(g_deviceB->GetDeviceId());
959 
960     /**
961      * @tc.steps: step2. deviceB subscribe inkeys(k1, key6) and prefix key "k" query to deviceA
962      */
963     LOGI("============step 2============");
964     Key key6 { 'k', '6' };
965     Query query = Query::Select().InKeys({KEY_1, key6}).PrefixKey({ 'k' });
966     g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
967 
968     /**
969      * @tc.steps: step3. deviceA put k1,key6 and wait
970      */
971     LOGI("============step 3============");
972     EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
973         {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
974         {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
975     }));
976     WaitUntilNotify(*g_deviceB);
977 
978     /**
979      * @tc.steps: step4. deviceB has key6, has no k1
980      */
981     LOGI("============step 4============");
982     VirtualDataItem item;
983     EXPECT_EQ(g_deviceB->GetData(key6, item), E_OK);
984     EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
985     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
986 }
987 
988 
989 /**
990  * @tc.name: subscribeSync006
991  * @tc.desc: test one device unsubscribe no effect other device
992  * @tc.type: FUNC
993  * @tc.require:
994  * @tc.author: zhangqiquan
995  */
996 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync006, TestSize.Level1)
997 {
998     /**
999      * @tc.steps: step1. InitSchemaDb
1000      */
1001     LOGI("============step 1============");
1002     InitSubSchemaDb();
1003     std::vector<std::string> devices;
1004     devices.push_back(g_deviceB->GetDeviceId());
1005     devices.push_back(g_deviceC->GetDeviceId());
1006 
1007     /**
1008      * @tc.steps: step2. deviceB unsubscribe inkeys(k1, key6) and prefix key "k" query to deviceA
1009      */
1010     LOGI("============step 2============");
1011     Key key6 { 'k', '6' };
1012     Query query = Query::Select().InKeys({KEY_1, key6}).PrefixKey({ 'k' });
1013     g_deviceB->Online();
1014     g_deviceC->Online();
1015     g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
1016     g_deviceC->Subscribe(QuerySyncObject(query), true, 3);
1017 
1018     /**
1019      * @tc.steps: step3. deviceC unsubscribe
1020      */
1021     LOGI("============step 3============");
1022     g_deviceC->UnSubscribe(QuerySyncObject(query), true, 3);
1023 
1024     /**
1025      * @tc.steps: step4. deviceA put k1,key6 and wait
1026      */
1027     LOGI("============step 4============");
1028     EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
1029         {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1030         {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1031     }));
1032     WaitUntilNotify(*g_deviceB);
1033 
1034     /**
1035      * @tc.steps: step5. deviceB has key6, has no k1
1036      */
1037     LOGI("============step 5============");
1038     VirtualDataItem item;
1039     EXPECT_EQ(g_deviceB->GetData(key6, item), E_OK);
1040     EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
1041     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
1042 }
1043 
1044 /**
1045  * @tc.name: subscribeSync007
1046  * @tc.desc: test subscribe query with order by write time
1047  * @tc.type: FUNC
1048  * @tc.require:
1049  * @tc.author: zhuwentao
1050  */
1051 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync007, TestSize.Level1)
1052 {
1053     /**
1054      * @tc.steps: step1. InitSchemaDb
1055     */
1056     LOGI("============step 1============");
1057     InitSubSchemaDb();
1058     std::vector<std::string> devices = {"DEVICE_B"};
1059 
1060     /**
1061      * @tc.steps: step2. deviceA subscribe query with order by write time
1062      * * @tc.expected: step2. interface return not support
1063     */
1064     Query query = Query::Select().EqualTo("$.field_name1", 1).OrderByWriteTime(false);
1065     EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true) == NOT_SUPPORT);
1066     EXPECT_TRUE(g_schemaKvDelegatePtr->UnSubscribeRemoteQuery(devices, nullptr, query, true) == NOT_SUPPORT);
1067 }
1068 
1069 /**
1070  * @tc.name: SubscribeSync008
1071  * @tc.desc: test subscribe with reopen db
1072  * @tc.type: FUNC
1073  * @tc.require:
1074  * @tc.author: zhangqiquan
1075  */
1076 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync008, TestSize.Level1)
1077 {
1078     /**
1079      * @tc.steps: step1. InitSchemaDb
1080      */
1081     std::shared_ptr<DBInfoHandleTest> handleTest = std::make_shared<DBInfoHandleTest>();
1082     RuntimeConfig::SetDBInfoHandle(handleTest);
1083 
1084     LOGI("============step 1============");
1085     InitSubSchemaDb();
1086     std::vector<std::string> devices;
1087     devices.push_back(g_deviceB->GetDeviceId());
1088 
1089     /**
1090      * @tc.steps: step2. deviceB subscribe query to deviceA
1091      */
1092     LOGI("============step 2============");
1093     Key key6 { 'k', '6' };
1094     Query query = Query::Select();
1095     g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
1096 
1097     /**
1098      * @tc.steps: step3. deviceA put k1,key6 and wait
1099      */
1100     LOGI("============step 3============");
1101     EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
1102         {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1103     }));
1104     EXPECT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
1105     g_schemaKvDelegatePtr = nullptr;
1106     InitSubSchemaDb();
1107     g_deviceB->Online();
1108     WaitUntilNotify(*g_deviceB);
1109 
1110     /**
1111      * @tc.steps: step4. deviceB has key6
1112      */
1113     LOGI("============step 4============");
1114     VirtualDataItem item;
1115     if (g_deviceB->GetData(key6, item) == E_OK) {
1116         EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
1117     }
1118     RuntimeConfig::SetDBInfoHandle(nullptr);
1119 }
1120 
1121 namespace {
CreateKvVirtualDevice(const std::string & deviceName)1122 KvVirtualDevice *CreateKvVirtualDevice(const std::string &deviceName)
1123 {
1124     KvVirtualDevice *device = nullptr;
1125     do {
1126         if (g_communicatorAggregator == nullptr) {
1127             break;
1128         }
1129         device = new (std::nothrow) KvVirtualDevice(deviceName);
1130         if (device == nullptr) {
1131             break;
1132         }
1133         auto interface = new (std::nothrow) VirtualSingleVerSyncDBInterface();
1134         if (interface == nullptr) {
1135             delete device;
1136             device = nullptr;
1137             break;
1138         }
1139         interface->SetSchemaInfo(SCHEMA_STRING);
1140         EXPECT_EQ(device->Initialize(g_communicatorAggregator, interface), E_OK);
1141     } while (false);
1142     return device;
1143 }
1144 }
1145 
1146 /**
1147  * @tc.name: SubscribeSync009
1148  * @tc.desc: test subscribe query with 33 device
1149  * @tc.type: FUNC
1150  * @tc.require:
1151  * @tc.author: zhangqiquan
1152  */
1153 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync009, TestSize.Level3)
1154 {
1155     /**
1156      * @tc.steps: step1. InitSchemaDb
1157      */
1158     LOGI("============step 1============");
1159     InitSubSchemaDb();
1160     const int maxDeviceCount = 32;
1161     std::vector<KvVirtualDevice *> devices;
1162     for (int i = 0; i < maxDeviceCount; ++i) {
1163         std::string deviceName = "D_" + std::to_string(i);
1164         auto device = CreateKvVirtualDevice(deviceName);
1165         EXPECT_NE(device, nullptr);
1166         if (device == nullptr) {
1167             continue;
1168         }
1169         devices.push_back(device);
1170     }
1171 
1172     /**
1173      * @tc.steps: step2. 33 device subscribe
1174      */
1175     LOGI("============step 2============");
1176     Query query = Query::Select();
1177     for (const auto &dev: devices) {
1178         dev->Online();
1179         dev->Subscribe(QuerySyncObject(query), true, 1); // sync id is 1
1180     }
1181     g_deviceB->Subscribe(QuerySyncObject(query), true, 1); // sync id is 1
1182     /**
1183      * @tc.steps: step3. 32 unsubscribe
1184      */
1185     LOGI("============step 3============");
__anonf124574d0502(std::map<std::string, int> res) 1186     SyncOperation::UserCallback callback = [](std::map<std::string, int> res) {
1187         ASSERT_EQ(res.size(), 1u);
1188         EXPECT_EQ(res["real_device"], SyncOperation::OP_FINISHED_ALL);
1189     };
1190     for (const auto &dev: devices) {
1191         dev->UnSubscribe(QuerySyncObject(query), true, 1, callback); // sync id is 1
1192         delete dev;
1193     }
1194 }
1195 
1196 /*
1197  * @tc.name: SubscribeSync010
1198  * @tc.desc: test subscribe query cache
1199  * @tc.type: FUNC
1200  * @tc.require:
1201  * @tc.author: zhangqiquan
1202  */
1203 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync010, TestSize.Level1)
1204 {
1205     SubscribeRecorder recorder;
1206     DBInfo dbInfo = {
1207         USER_ID,
1208         APP_ID,
1209         STORE_ID_1,
1210         false,
1211         true
1212     };
1213     Query query = Query::Select();
1214     QuerySyncObject querySyncObject(query);
1215     /**
1216      * @tc.steps: step1. Insert one record twice and remove
1217      */
1218     recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1219     recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1220     recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1221     std::map<std::string, std::vector<QuerySyncObject>> subscribeQuery;
1222     recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1223     for (const auto &entry: subscribeQuery) {
1224         EXPECT_EQ(entry.second.size(), 0u);
1225     }
1226     /**
1227      * @tc.steps: step2. Remove no exist data
1228      */
1229     recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1230     recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1231     for (const auto &entry: subscribeQuery) {
1232         EXPECT_EQ(entry.second.size(), 0u);
1233     }
1234     /**
1235      * @tc.steps: step3. insert two data and remove one data
1236      */
1237     recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1238     Query query2 = Query::Select().EqualTo("test", "test");
1239     recorder.RecordSubscribe(dbInfo, DEVICE_A, QuerySyncObject(query2));
1240     recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1241     recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1242     for (const auto &entry: subscribeQuery) {
1243         EXPECT_EQ(entry.second.size(), 1u);
1244     }
1245     /**
1246      * @tc.steps: step4. remove no exist data
1247      */
1248     dbInfo.storeId = STORE_ID_2;
1249     recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A);
1250     dbInfo.storeId = STORE_ID_1;
1251     recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1252     for (const auto &entry: subscribeQuery) {
1253         EXPECT_EQ(entry.second.size(), 1u);
1254     }
1255 }
1256 
1257 /*
1258  * @tc.name: SubscribeSync011
1259  * @tc.desc: test subscribe query cache
1260  * @tc.type: FUNC
1261  * @tc.require:
1262  * @tc.author: zhangqiquan
1263  */
1264 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync011, TestSize.Level1)
1265 {
1266     SubscribeRecorder recorder;
1267     DBInfo dbInfo = {
1268             USER_ID,
1269             APP_ID,
1270             STORE_ID_1,
1271             false,
1272             true
1273     };
1274     /**
1275      * @tc.steps: step1. Insert 2 record in db1 and 1 record in db2
1276      */
1277     Query query = Query::Select();
1278     QuerySyncObject querySyncObject(query);
1279     recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1280     recorder.RecordSubscribe(dbInfo, DEVICE_B, querySyncObject);
1281     DBInfo dbInfo2 = dbInfo;
1282     dbInfo2.storeId = STORE_ID_2;
1283     recorder.RecordSubscribe(dbInfo2, DEVICE_B, querySyncObject);
1284     /**
1285      * @tc.steps: step2. Insert 2 record in db1
1286      */
1287     recorder.RemoveRemoteSubscribe(dbInfo);
1288     std::map<std::string, std::vector<QuerySyncObject>> subscribeQuery;
1289     recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1290     EXPECT_EQ(subscribeQuery.size(), 0u);
1291     recorder.GetSubscribeQuery(dbInfo2, subscribeQuery);
1292     EXPECT_EQ(subscribeQuery.size(), 1u);
1293     /**
1294      * @tc.steps: step3. Insert 1 record in db2
1295      */
1296     recorder.RemoveRemoteSubscribe(dbInfo2);
1297     subscribeQuery.clear();
1298     recorder.GetSubscribeQuery(dbInfo2, subscribeQuery);
1299     EXPECT_EQ(subscribeQuery.size(), 0u);
1300 }
1301 
1302 /**
1303  * @tc.name: subscribeSync012
1304  * @tc.desc: test one device unsubscribe no effect other device
1305  * @tc.type: FUNC
1306  * @tc.require:
1307  * @tc.author: zhangqiquan
1308  */
1309 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync012, TestSize.Level1)
1310 {
1311     /**
1312      * @tc.steps: step1. InitSchemaDb
1313      */
1314     LOGI("============step 1============");
1315     InitSubSchemaDb();
1316     std::vector<std::string> devices;
1317     devices.push_back(g_deviceB->GetDeviceId());
1318     devices.push_back(g_deviceC->GetDeviceId());
1319 
1320     /**
1321      * @tc.steps: step2. deviceB unsubscribe inkeys(k1, key6) and prefix key "k" query to deviceA
1322      */
1323     LOGI("============step 2============");
1324     Key key6 { 'k', '6' };
1325     Query query = Query::Select();
1326     g_deviceB->Online();
1327     g_deviceC->Online();
1328     g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
1329     g_deviceC->Subscribe(QuerySyncObject(query), true, 3);
1330 
1331     /**
1332      * @tc.steps: step3. deviceC unsubscribe
1333      */
1334     LOGI("============step 3============");
1335     g_deviceC->Offline();
1336 
1337     /**
1338      * @tc.steps: step4. deviceA put k1,key6 and wait
1339      */
1340     LOGI("============step 4============");
1341     const uint8_t putItemCount = 10u;
1342     std::vector<Key> dataKeys;
1343     for (uint8_t i = 0u; i < putItemCount; ++i) {
1344         Key key = { i };
1345         dataKeys.push_back(key);
1346         EXPECT_EQ(g_schemaKvDelegatePtr->Put(key, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())), OK);
1347         WaitUntilNotify(*g_deviceB);
1348     }
1349     /**
1350      * @tc.steps: step5. deviceB has key6, has no k1
1351      */
1352     LOGI("============step 5============");
1353     for (const auto &key: dataKeys) {
1354         VirtualDataItem item;
1355         EXPECT_EQ(g_deviceB->GetData(key, item), E_OK);
1356         EXPECT_EQ(g_deviceC->GetData(key, item), -E_NOT_FOUND);
1357     }
1358 }
1359 
1360 /*
1361  * @tc.name: SubscribeSync013
1362  * @tc.desc: test subscribe query cache with remote support false
1363  * @tc.type: FUNC
1364  * @tc.require:
1365  * @tc.author: zhangqiquan
1366  */
1367 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync013, TestSize.Level1)
1368 {
1369     std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1370     RuntimeConfig::SetDBInfoHandle(handle);
1371     handle->SetLocalIsSupport(true);
1372     DBInfo dbInfo = {
1373         USER_ID,
1374         APP_ID,
1375         STORE_ID_1,
1376         false,
1377         true
1378     };
1379     RuntimeContext::GetInstance()->SetRemoteOptimizeCommunication(DEVICE_A, false);
1380     Query query = Query::Select();
1381     QuerySyncObject querySyncObject(query);
1382     /**
1383      * @tc.steps: step1. Insert one record
1384      */
1385     RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1386     std::map<std::string, std::vector<QuerySyncObject>> subscribeQuery;
1387     RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1388     EXPECT_EQ(subscribeQuery.size(), 1u);
1389     /**
1390      * @tc.steps: step2. Remove one record
1391      */
1392     RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1393     RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1394     EXPECT_EQ(subscribeQuery[DEVICE_A].size(), 0u);
1395     /**
1396      * @tc.steps: step3. Record again and remove by dbInfo and device
1397      */
1398     RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1399     RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, DEVICE_A);
1400     RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1401     EXPECT_EQ(subscribeQuery.size(), 0u);
1402     /**
1403      * @tc.steps: step4. Record again and remove by device
1404      */
1405     RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1406     RuntimeContext::GetInstance()->RemoveRemoteSubscribe(DEVICE_A);
1407     RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1408     EXPECT_EQ(subscribeQuery.size(), 0u);
1409     /**
1410      * @tc.steps: step5. Record again and remove by dbInfo
1411      */
1412     RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1413     RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo);
1414     RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1415     EXPECT_EQ(subscribeQuery.size(), 0u);
1416     RuntimeConfig::SetDBInfoHandle(nullptr);
1417 }
1418 /**
1419  * @tc.name: SubscribeSync014
1420  * @tc.desc: test device subscribe with put a lot of times
1421  * @tc.type: FUNC
1422  * @tc.require:
1423  * @tc.author: zhangqiquan
1424  */
1425 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync014, TestSize.Level3)
1426 {
1427     /**
1428     * @tc.steps: step1. InitSchemaDb
1429     */
1430     LOGI("============step 1============");
1431     InitSubSchemaDb();
1432     std::vector<std::string> devices;
1433     devices.push_back(g_deviceB->GetDeviceId());
1434     /**
1435      * @tc.steps: step2. deviceB unsubscribe inkeys(k1, key6) and prefix key "k" query to deviceA
1436      */
1437     LOGI("============step 2============");
1438     Key key6 { 'k', '6' };
1439     Query query = Query::Select();
1440     g_deviceB->Online();
1441     g_deviceB->Subscribe(QuerySyncObject(query), true, 1);
1442     /**
1443      * @tc.steps: step3. deviceA put a lot of time
1444      * @tc.expected: step3 put performance was not effected by subscribe
1445      */
1446     LOGI("============step 4============");
1447     std::vector<Key> dataKeys;
1448     const uint64_t PUT_LIMIT_30S = 30 * 1000000; // 30s = 30 * 1000000us
1449     LOGD("BEGIN PUT");
1450     for (uint8_t i = 0u; i < 10u; ++i) { // loop 10 times
1451         Key key = { i };
1452         dataKeys.push_back(key);
1453         uint64_t curTime = 0;
1454         uint64_t lastTime = 0;
1455         EXPECT_EQ(OS::GetCurrentSysTimeInMicrosecond(curTime), E_OK);
1456         lastTime = curTime;
1457         EXPECT_EQ(g_schemaKvDelegatePtr->Put(key, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())), OK);
1458         EXPECT_EQ(OS::GetCurrentSysTimeInMicrosecond(curTime), E_OK);
1459         EXPECT_LE(curTime - lastTime, PUT_LIMIT_30S);
1460     }
1461     LOGD("END PUT");
1462 }
1463 
1464 /**
1465  * @tc.name: subscribeSync015
1466  * @tc.desc: test subscribe query with range
1467  * @tc.type: FUNC
1468  * @tc.require:
1469  * @tc.author: mazhao
1470  */
1471 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync015, TestSize.Level1)
1472 {
1473     /**
1474      * @tc.steps: step1. InitSchemaDb
1475     */
1476     LOGI("============step 1============");
1477     InitSubSchemaDb();
1478     std::vector<std::string> devices = {"DEVICE_B"};
1479 
1480     /**
1481      * @tc.steps: step2. deviceA subscribe query by Range.
1482      * * @tc.expected: step2. interface return not support
1483     */
1484     Query query = Query::Select().Range({}, {});
1485     EXPECT_EQ(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true), NOT_SUPPORT);
1486     EXPECT_EQ(g_schemaKvDelegatePtr->UnSubscribeRemoteQuery(devices, nullptr, query, true), NOT_SUPPORT);
1487 }
1488 } // namespace