• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 #include <thread>
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "distributeddb_communicator_common.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "endian_convert.h"
24 #include "log_print.h"
25 #include "runtime_config.h"
26 #include "thread_pool_test_stub.h"
27 #include "virtual_communicator_aggregator.h"
28 
29 using namespace std;
30 using namespace testing::ext;
31 using namespace DistributedDB;
32 using namespace DistributedDBUnitTest;
33 
34 namespace {
35     EnvHandle g_envDeviceA;
36     EnvHandle g_envDeviceB;
37     EnvHandle g_envDeviceC;
38 
HandleConnectChange(OnOfflineDevice & onlines,const std::string & target,bool isConnect)39 static void HandleConnectChange(OnOfflineDevice &onlines, const std::string &target, bool isConnect)
40 {
41     if (isConnect) {
42         onlines.onlineDevices.insert(target);
43         onlines.latestOnlineDevice = target;
44         onlines.latestOfflineDevice.clear();
45     } else {
46         onlines.onlineDevices.erase(target);
47         onlines.latestOnlineDevice.clear();
48         onlines.latestOfflineDevice = target;
49     }
50 }
51 
52 class DistributedDBCommunicatorTest : public testing::Test {
53 public:
54     static void SetUpTestCase(void);
55     static void TearDownTestCase(void);
56     void SetUp();
57     void TearDown();
58 };
59 
SetUpTestCase(void)60 void DistributedDBCommunicatorTest::SetUpTestCase(void)
61 {
62     /**
63      * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
64      */
65     LOGI("[UT][Test][SetUpTestCase] Enter.");
66     bool errCode = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
67     ASSERT_EQ(errCode, true);
68     errCode = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
69     ASSERT_EQ(errCode, true);
70     DoRegTransformFunction();
71     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
72 }
73 
TearDownTestCase(void)74 void DistributedDBCommunicatorTest::TearDownTestCase(void)
75 {
76     /**
77      * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
78      */
79     LOGI("[UT][Test][TearDownTestCase] Enter.");
80     std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
81     TearDownEnv(g_envDeviceA);
82     TearDownEnv(g_envDeviceB);
83     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
84 }
85 
SetUp()86 void DistributedDBCommunicatorTest::SetUp()
87 {
88     DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
89 }
90 
TearDown()91 void DistributedDBCommunicatorTest::TearDown()
92 {
93     /**
94      * @tc.teardown: Wait 100 ms to make sure all thread quiet
95      */
96     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
97 }
98 
99 /**
100  * @tc.name: Communicator Management 001
101  * @tc.desc: Test alloc and release communicator
102  * @tc.type: FUNC
103  * @tc.require: AR000BVDGG AR000CQE0L
104  * @tc.author: xiaozhenjian
105  */
106 HWTEST_F(DistributedDBCommunicatorTest, CommunicatorManagement001, TestSize.Level1)
107 {
108     /**
109      * @tc.steps: step1. alloc communicator A using label A
110      * @tc.expected: step1. alloc return OK.
111      */
112     int errorNo = E_OK;
113     ICommunicator *commA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
114     EXPECT_EQ(errorNo, E_OK);
115     EXPECT_NE(commA, nullptr);
116 
117     /**
118      * @tc.steps: step2. alloc communicator B using label B
119      * @tc.expected: step2. alloc return OK.
120      */
121     errorNo = E_OK;
122     ICommunicator *commB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
123     EXPECT_EQ(errorNo, E_OK);
124     EXPECT_NE(commA, nullptr);
125 
126     /**
127      * @tc.steps: step3. alloc communicator C using label A
128      * @tc.expected: step3. alloc return not OK.
129      */
130     errorNo = E_OK;
131     ICommunicator *commC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
132     EXPECT_NE(errorNo, E_OK);
133     EXPECT_EQ(commC, nullptr);
134 
135     /**
136      * @tc.steps: step4. release communicator A and communicator B
137      */
138     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commA);
139     commA = nullptr;
140     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commB);
141     commB = nullptr;
142 
143     /**
144      * @tc.steps: step5. alloc communicator D using label A
145      * @tc.expected: step5. alloc return OK.
146      */
147     errorNo = E_OK;
148     ICommunicator *commD = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
149     EXPECT_EQ(errorNo, E_OK);
150     EXPECT_NE(commD, nullptr);
151 
152     /**
153      * @tc.steps: step6. release communicator D
154      */
155     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commD);
156     commD = nullptr;
157 }
158 
ConnectWaitDisconnect()159 static void ConnectWaitDisconnect()
160 {
161     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
162     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
163     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
164 }
165 
166 /**
167  * @tc.name: Online And Offline 001
168  * @tc.desc: Test functionality triggered by physical devices online and offline
169  * @tc.type: FUNC
170  * @tc.require: AR000BVRNS AR000CQE0H
171  * @tc.author: wudongxing
172  */
173 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline001, TestSize.Level1)
174 {
175     /**
176      * @tc.steps: step1. device A alloc communicator AA using label A and register callback
177      * @tc.expected: step1. no callback.
178      */
179     int errorNo = E_OK;
180     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
181     ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
182     OnOfflineDevice onlineForAA;
__anon8a05f61d0202(const std::string &target, bool isConnect) 183     commAA->RegOnConnectCallback([&onlineForAA](const std::string &target, bool isConnect) {
184         HandleConnectChange(onlineForAA, target, isConnect);}, nullptr);
185     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
186 
187     /**
188      * @tc.steps: step2. connect device A with device B and then disconnect
189      * @tc.expected: step2. no callback.
190      */
191     ConnectWaitDisconnect();
192     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
193 
194     /**
195      * @tc.steps: step3. device B alloc communicator BB using label B and register callback
196      * @tc.expected: step3. no callback.
197      */
198     ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
199     ASSERT_NOT_NULL_AND_ACTIVATE(commBB);
200     OnOfflineDevice onlineForBB;
__anon8a05f61d0302(const std::string &target, bool isConnect) 201     commBB->RegOnConnectCallback([&onlineForBB](const std::string &target, bool isConnect) {
202         HandleConnectChange(onlineForBB, target, isConnect);}, nullptr);
203     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
204     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
205 
206     /**
207      * @tc.steps: step4. connect device A with device B and then disconnect
208      * @tc.expected: step4. no callback.
209      */
210     ConnectWaitDisconnect();
211     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
212     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
213 
214     /**
215      * @tc.steps: step5. device B alloc communicator BA using label A and register callback
216      * @tc.expected: step5. no callback.
217      */
218     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
219     ASSERT_NOT_NULL_AND_ACTIVATE(commBA);
220     OnOfflineDevice onlineForBA;
__anon8a05f61d0402(const std::string &target, bool isConnect) 221     commBA->RegOnConnectCallback([&onlineForBA](const std::string &target, bool isConnect) {
222         HandleConnectChange(onlineForBA, target, isConnect);}, nullptr);
223     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
224     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
225     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
226 
227     /**
228      * @tc.steps: step6. connect device A with device B
229      * @tc.expected: step6. communicator AA has callback of device B online;
230      *                      communicator BA has callback of device A online;
231      *                      communicator BB no callback
232      */
233     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
234     std::this_thread::sleep_for(std::chrono::milliseconds(100));
235     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
236     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
237     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
238     EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
239     EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
240 
241     /**
242      * @tc.steps: step7. disconnect device A with device B
243      * @tc.expected: step7. communicator AA has callback of device B offline;
244      *                      communicator BA has callback of device A offline;
245      *                      communicator BB no callback
246      */
247     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
248     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
249     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
250     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
251     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
252     EXPECT_EQ(onlineForAA.latestOfflineDevice, DEVICE_NAME_B);
253     EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
254 
255     // Clean up
256     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
257     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
258     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
259 }
260 
261 #define REG_CONNECT_CALLBACK(communicator, online) \
262 { \
263     communicator->RegOnConnectCallback([&online](const std::string &target, bool isConnect) { \
264         HandleConnectChange(online, target, isConnect); \
265     }, nullptr); \
266 }
267 
268 #define CONNECT_AND_WAIT(waitTime) \
269 { \
270     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle); \
271     std::this_thread::sleep_for(std::chrono::milliseconds(waitTime)); \
272 }
273 
274 /**
275  * @tc.name: Online And Offline 002
276  * @tc.desc: Test functionality triggered by alloc and release communicator
277  * @tc.type: FUNC
278  * @tc.require: AR000BVRNT AR000CQE0I
279  * @tc.author: wudongxing
280  */
281 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline002, TestSize.Level1)
282 {
283     /**
284      * @tc.steps: step1. connect device A with device B
285      */
286     CONNECT_AND_WAIT(200); // Sleep 200 ms
287 
288     /**
289      * @tc.steps: step2. device A alloc communicator AA using label A and register callback
290      * @tc.expected: step2. no callback.
291      */
292     int errorNo = E_OK;
293     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
294     ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
295     OnOfflineDevice onlineForAA;
296     REG_CONNECT_CALLBACK(commAA, onlineForAA);
297     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
298 
299     /**
300      * @tc.steps: step3. device B alloc communicator BB using label B and register callback
301      * @tc.expected: step3. no callback.
302      */
303     ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
304     ASSERT_NOT_NULL_AND_ACTIVATE(commBB);
305     OnOfflineDevice onlineForBB;
306     REG_CONNECT_CALLBACK(commBB, onlineForBB);
307     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
308     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
309 
310     /**
311      * @tc.steps: step4. device B alloc communicator BA using label A and register callback
312      * @tc.expected: step4. communicator AA has callback of device B online;
313      *                      communicator BA has callback of device A online;
314      *                      communicator BB no callback.
315      */
316     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
317     ASSERT_NOT_NULL_AND_ACTIVATE(commBA);
318     OnOfflineDevice onlineForBA;
319     REG_CONNECT_CALLBACK(commBA, onlineForBA);
320     std::this_thread::sleep_for(std::chrono::milliseconds(100));
321     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
322     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
323     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
324     EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
325     EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
326 
327     /**
328      * @tc.steps: step5. device A alloc communicator AB using label B and register callback
329      * @tc.expected: step5. communicator AB has callback of device B online;
330      *                      communicator BB has callback of device A online;
331      */
332     ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
333     ASSERT_NOT_NULL_AND_ACTIVATE(commAB);
334     OnOfflineDevice onlineForAB;
335     REG_CONNECT_CALLBACK(commAB, onlineForAB);
336     std::this_thread::sleep_for(std::chrono::milliseconds(100));
337     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
338     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
339     EXPECT_EQ(onlineForAB.latestOnlineDevice, DEVICE_NAME_B);
340     EXPECT_EQ(onlineForBB.latestOnlineDevice, DEVICE_NAME_A);
341 
342     /**
343      * @tc.steps: step6. device A release communicator AA
344      * @tc.expected: step6. communicator BA has callback of device A offline;
345      *                      communicator AB and BB no callback;
346      */
347     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
348     std::this_thread::sleep_for(std::chrono::milliseconds(100));
349     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
350     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
351     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
352     EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
353 
354     /**
355      * @tc.steps: step7. device B release communicator BA
356      * @tc.expected: step7. communicator AB and BB no callback;
357      */
358     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
359     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
360     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
361 
362     /**
363      * @tc.steps: step8. device B release communicator BB
364      * @tc.expected: step8. communicator AB has callback of device B offline;
365      */
366     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
367     std::this_thread::sleep_for(std::chrono::milliseconds(100));
368     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(0));
369     EXPECT_EQ(onlineForAB.latestOfflineDevice, DEVICE_NAME_B);
370 
371     // Clean up
372     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
373     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
374 }
375 
TestRemoteRestart()376 void TestRemoteRestart()
377 {
378     /**
379      * @tc.steps: step1. connect device D with device E
380      */
381     EnvHandle envDeviceD;
382     EnvHandle envDeviceE;
383     SetUpEnv(envDeviceD, "DEVICE_D");
384     SetUpEnv(envDeviceE, "DEVICE_E");
385 
386     /**
387      * @tc.steps: step2. device D alloc communicator DD using label A and register callback
388      */
389     int errorNo = E_OK;
390     ICommunicator *commDD = envDeviceD.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
391     ASSERT_NOT_NULL_AND_ACTIVATE(commDD);
392     OnOfflineDevice onlineForDD;
393     REG_CONNECT_CALLBACK(commDD, onlineForDD);
394 
395     /**
396      * @tc.steps: step3. device E alloc communicator EE using label A and register callback
397      */
398     ICommunicator *commEE = envDeviceE.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
399     ASSERT_NOT_NULL_AND_ACTIVATE(commEE);
400     OnOfflineDevice onlineForEE;
401     REG_CONNECT_CALLBACK(commEE, onlineForEE);
402     /**
403      * @tc.steps: step4. deviceD connect to deviceE
404      * @tc.expected: step4. both communicator has callback;
405      */
406     AdapterStub::ConnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
407     std::this_thread::sleep_for(std::chrono::seconds(1));
408     EXPECT_EQ(onlineForDD.onlineDevices.size(), static_cast<size_t>(1));
409     EXPECT_EQ(onlineForEE.onlineDevices.size(), static_cast<size_t>(1));
410 
411     /**
412      * @tc.steps: step5. device E restart
413      */
414     envDeviceE.commAggrHandle->ReleaseCommunicator(commEE);
415     std::this_thread::sleep_for(std::chrono::seconds(1));
416     TearDownEnv(envDeviceE);
417     SetUpEnv(envDeviceE, "DEVICE_E");
418 
419     commEE = envDeviceE.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
420     ASSERT_NOT_NULL_AND_ACTIVATE(commEE);
421     REG_CONNECT_CALLBACK(commEE, onlineForEE);
422     onlineForEE.onlineDevices.clear();
423     /**
424      * @tc.steps: step6. deviceD connect to deviceE again
425      * @tc.expected: step6. communicatorE has callback;
426      */
427     AdapterStub::ConnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
428     int reTryTimes = 5;
429     while (onlineForEE.onlineDevices.size() != 1 && reTryTimes > 0) {
430         std::this_thread::sleep_for(std::chrono::seconds(1));
431         reTryTimes--;
432     }
433     EXPECT_EQ(onlineForEE.onlineDevices.size(), static_cast<size_t>(1));
434     // Clean up and disconnect
435     envDeviceD.commAggrHandle->ReleaseCommunicator(commDD);
436     envDeviceE.commAggrHandle->ReleaseCommunicator(commEE);
437     std::this_thread::sleep_for(std::chrono::seconds(1));
438 
439     AdapterStub::DisconnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
440     TearDownEnv(envDeviceD);
441     TearDownEnv(envDeviceE);
442 }
443 /**
444  * @tc.name: Online And Offline 003
445  * @tc.desc: Test functionality triggered by remote restart
446  * @tc.type: FUNC
447  * @tc.require: AR000CQE0I
448  * @tc.author: zhangqiquan
449  */
450 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline003, TestSize.Level1)
451 {
452     TestRemoteRestart();
453 }
454 
455 /**
456  * @tc.name: Online And Offline 004
457  * @tc.desc: Test functionality triggered by remote restart with thread pool
458  * @tc.type: FUNC
459  * @tc.require: AR000CQE0I
460  * @tc.author: zhangqiquan
461  */
462 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline004, TestSize.Level1)
463 {
464     auto threadPool = std::make_shared<ThreadPoolTestStub>();
465     RuntimeContext::GetInstance()->SetThreadPool(threadPool);
466     TestRemoteRestart();
467     RuntimeContext::GetInstance()->SetThreadPool(nullptr);
468 }
469 
470 /**
471  * @tc.name: Report Device Connect Change 001
472  * @tc.desc: Test CommunicatorAggregator support report device connect change event
473  * @tc.type: FUNC
474  * @tc.require: AR000DR9KV
475  * @tc.author: xiaozhenjian
476  */
477 HWTEST_F(DistributedDBCommunicatorTest, ReportDeviceConnectChange001, TestSize.Level1)
478 {
479     /**
480      * @tc.steps: step1. device A and device B register connect callback to CommunicatorAggregator
481      */
482     OnOfflineDevice onlineForA;
483     int errCode = g_envDeviceA.commAggrHandle->RegOnConnectCallback(
__anon8a05f61d0502(const std::string &target, bool isConnect) 484         [&onlineForA](const std::string &target, bool isConnect) {
485             HandleConnectChange(onlineForA, target, isConnect);
486         }, nullptr);
487     EXPECT_EQ(errCode, E_OK);
488     OnOfflineDevice onlineForB;
489     errCode = g_envDeviceB.commAggrHandle->RegOnConnectCallback(
__anon8a05f61d0602(const std::string &target, bool isConnect) 490         [&onlineForB](const std::string &target, bool isConnect) {
491             HandleConnectChange(onlineForB, target, isConnect);
492         }, nullptr);
493     EXPECT_EQ(errCode, E_OK);
494 
495     /**
496      * @tc.steps: step2. connect device A with device B
497      * @tc.expected: step2. device A callback B online; device B callback A online;
498      */
499     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
500     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
501     EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(1));
502     EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(1));
503     EXPECT_EQ(onlineForA.latestOnlineDevice, DEVICE_NAME_B);
504     EXPECT_EQ(onlineForB.latestOnlineDevice, DEVICE_NAME_A);
505 
506     /**
507      * @tc.steps: step3. connect device A with device B
508      * @tc.expected: step3. device A callback B offline; device B callback A offline;
509      */
510     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
511     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
512     EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(0));
513     EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(0));
514     EXPECT_EQ(onlineForA.latestOfflineDevice, DEVICE_NAME_B);
515     EXPECT_EQ(onlineForB.latestOfflineDevice, DEVICE_NAME_A);
516 
517     // Clean up
518     g_envDeviceA.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
519     g_envDeviceB.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
520 }
521 
522 namespace {
ToLabelType(uint64_t commLabel)523 LabelType ToLabelType(uint64_t commLabel)
524 {
525     uint64_t netOrderLabel = HostToNet(commLabel);
526     uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
527     std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
528     for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
529         realLabel[i] = eachByte[i];
530     }
531     return realLabel;
532 }
533 }
534 
535 /**
536  * @tc.name: Report Communicator Not Found 001
537  * @tc.desc: Test CommunicatorAggregator support report communicator not found event
538  * @tc.type: FUNC
539  * @tc.require: AR000DR9KV
540  * @tc.author: xiaozhenjian
541  */
542 HWTEST_F(DistributedDBCommunicatorTest, ReportCommunicatorNotFound001, TestSize.Level1)
543 {
544     /**
545      * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
546      */
547     std::vector<LabelType> lackLabels;
548     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anon8a05f61d0802(const LabelType &commLabel, const std::string &userId)549         [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
550             lackLabels.push_back(commLabel);
551             return -E_NOT_FOUND;
552         }, nullptr);
553     EXPECT_EQ(errCode, E_OK);
554 
555     /**
556      * @tc.steps: step2. connect device A with device B
557      */
558     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
559     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
560 
561     /**
562      * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
563      * @tc.expected: step3. device B callback that label A not found.
564      */
565     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
566     ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
567     Message *msgForAA = BuildRegedTinyMessage();
568     ASSERT_NE(msgForAA, nullptr);
569     SendConfig conf = {true, false, 0};
570     errCode = commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
571     EXPECT_EQ(errCode, E_OK);
572     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
573     ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
574     EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
575 
576     /**
577      * @tc.steps: step4. device B alloc communicator BA using label A and register message callback
578      * @tc.expected: step4. communicator BA will not receive message.
579      */
580     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
581     ASSERT_NE(commBA, nullptr);
582     Message *recvMsgForBA = nullptr;
__anon8a05f61d0902(const std::string &srcTarget, Message *inMsg) 583     commBA->RegOnMessageCallback([&recvMsgForBA](const std::string &srcTarget, Message *inMsg) {
584         recvMsgForBA = inMsg;
585     }, nullptr);
586     commBA->Activate();
587     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
588     EXPECT_EQ(recvMsgForBA, nullptr);
589 
590     // Clean up
591     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
592     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
593     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
594     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
595 }
596 
597 #define DO_SEND_MESSAGE(src, dst, label, session) \
598 { \
599     Message *msgFor##src##label = BuildRegedTinyMessage(); \
600     ASSERT_NE(msgFor##src##label, nullptr); \
601     msgFor##src##label->SetSessionId(session); \
602     SendConfig conf = {true, false, 0}; \
603     errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
604     EXPECT_EQ(errCode, E_OK); \
605 }
606 
607 #define DO_SEND_GIANT_MESSAGE(src, dst, label, size) \
608 { \
609     Message *msgFor##src##label = BuildRegedGiantMessage(size); \
610     ASSERT_NE(msgFor##src##label, nullptr); \
611     SendConfig conf = {false, false, 0}; \
612     errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
613     EXPECT_EQ(errCode, E_OK); \
614 }
615 
616 #define ALLOC_AND_SEND_MESSAGE(src, dst, label, session) \
617     ICommunicator *comm##src##label = g_envDevice##src.commAggrHandle->AllocCommunicator(LABEL_##label, errCode); \
618     ASSERT_NOT_NULL_AND_ACTIVATE(comm##src##label); \
619     DO_SEND_MESSAGE(src, dst, label, session)
620 
621 #define REG_MESSAGE_CALLBACK(src, label) \
622     string srcTargetFor##src##label; \
623     Message *recvMsgFor##src##label = nullptr; \
624     comm##src##label->RegOnMessageCallback( \
625         [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, Message *inMsg) { \
626         srcTargetFor##src##label = srcTarget; \
627         recvMsgFor##src##label = inMsg; \
628     }, nullptr);
629 
630 /**
631  * @tc.name: ReDeliver Message 001
632  * @tc.desc: Test CommunicatorAggregator support redeliver message
633  * @tc.type: FUNC
634  * @tc.require: AR000DR9KV
635  * @tc.author: xiaozhenjian
636  */
637 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage001, TestSize.Level1)
638 {
639     /**
640      * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
641      */
642     std::vector<LabelType> lackLabels;
643     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anon8a05f61d0a02(const LabelType &commLabel, const std::string &userId)644         [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
645             lackLabels.push_back(commLabel);
646             return E_OK;
647         }, nullptr);
648     EXPECT_EQ(errCode, E_OK);
649 
650     /**
651      * @tc.steps: step2. connect device A with device B
652      */
653     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
654     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
655 
656     /**
657      * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
658      * @tc.expected: step3. device B callback that label A not found.
659      */
660     ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
661     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
662     ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
663     EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
664 
665     /**
666      * @tc.steps: step4. device A alloc communicator AB using label B and send message to B
667      * @tc.expected: step4. device B callback that label B not found.
668      */
669     ALLOC_AND_SEND_MESSAGE(A, B, B, 200); // session id 200
670     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
671     ASSERT_EQ(lackLabels.size(), static_cast<size_t>(2));
672     EXPECT_EQ(lackLabels[1], ToLabelType(LABEL_B)); // 1 for second element
673 
674     /**
675      * @tc.steps: step5. device B alloc communicator BA using label A and register message callback
676      * @tc.expected: step5. communicator BA will receive message.
677      */
678     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
679     ASSERT_NE(commBA, nullptr);
680     REG_MESSAGE_CALLBACK(B, A);
681     commBA->Activate();
682     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
683     EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
684     ASSERT_NE(recvMsgForBA, nullptr);
685     EXPECT_EQ(recvMsgForBA->GetSessionId(), 100U); // session id 100
686     delete recvMsgForBA;
687     recvMsgForBA = nullptr;
688 
689     /**
690      * @tc.steps: step6. device B alloc communicator BB using label B and register message callback
691      * @tc.expected: step6. communicator BB will receive message.
692      */
693     ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
694     ASSERT_NE(commBB, nullptr);
695     REG_MESSAGE_CALLBACK(B, B);
696     commBB->Activate();
697     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
698     EXPECT_EQ(srcTargetForBB, DEVICE_NAME_A);
699     ASSERT_NE(recvMsgForBB, nullptr);
700     EXPECT_EQ(recvMsgForBB->GetSessionId(), 200U); // session id 200
701     delete recvMsgForBB;
702     recvMsgForBB = nullptr;
703 
704     // Clean up
705     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
706     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
707     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
708     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
709     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
710     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
711 }
712 
713 /**
714  * @tc.name: ReDeliver Message 002
715  * @tc.desc: Test CommunicatorAggregator support redeliver message by order
716  * @tc.type: FUNC
717  * @tc.require: AR000DR9KV
718  * @tc.author: xiaozhenjian
719  */
720 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage002, TestSize.Level1)
721 {
722     /**
723      * @tc.steps: step1. device C create CommunicatorAggregator and initialize
724      */
725     bool step1 = SetUpEnv(g_envDeviceC, DEVICE_NAME_C);
726     ASSERT_EQ(step1, true);
727 
728     /**
729      * @tc.steps: step2. device B register communicator not found callback to CommunicatorAggregator
730      */
731     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anon8a05f61d0b02(const LabelType &commLabel, const std::string &userId)732         const std::string &userId)->int {
733         return E_OK;
734     }, nullptr);
735     EXPECT_EQ(errCode, E_OK);
736 
737     /**
738      * @tc.steps: step3. connect device A with device B, then device B with device C
739      */
740     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
741     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
742     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
743 
744     /**
745      * @tc.steps: step4. device A alloc communicator AA using label A and send message to B
746      */
747     ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
748     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
749 
750     /**
751      * @tc.steps: step5. device C alloc communicator CA using label A and send message to B
752      */
753     ALLOC_AND_SEND_MESSAGE(C, B, A, 200); // session id 200
754     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
755     DO_SEND_MESSAGE(A, B, A, 300); // session id 300
756     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
757     DO_SEND_MESSAGE(C, B, A, 400); // session id 400
758     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
759 
760     /**
761      * @tc.steps: step6. device B alloc communicator BA using label A and register message callback
762      * @tc.expected: step6. communicator BA will receive message in order of sessionid 100, 200, 300, 400.
763      */
764     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
765     ASSERT_NE(commBA, nullptr);
766     std::vector<std::pair<std::string, Message *>> msgCallbackForBA;
__anon8a05f61d0c02(const std::string &srcTarget, Message *inMsg) 767     commBA->RegOnMessageCallback([&msgCallbackForBA](const std::string &srcTarget, Message *inMsg) {
768         msgCallbackForBA.push_back({srcTarget, inMsg});
769     }, nullptr);
770     commBA->Activate();
771     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
772     ASSERT_EQ(msgCallbackForBA.size(), static_cast<size_t>(4)); // total 4 callback
773     EXPECT_EQ(msgCallbackForBA[0].first, DEVICE_NAME_A); // the 0 order element
774     EXPECT_EQ(msgCallbackForBA[1].first, DEVICE_NAME_C); // the 1 order element
775     EXPECT_EQ(msgCallbackForBA[2].first, DEVICE_NAME_A); // the 2 order element
776     EXPECT_EQ(msgCallbackForBA[3].first, DEVICE_NAME_C); // the 3 order element
777     for (uint32_t i = 0; i < msgCallbackForBA.size(); i++) {
778         EXPECT_EQ(msgCallbackForBA[i].second->GetSessionId(), static_cast<uint32_t>((i + 1) * 100)); // 1 sessionid 100
779         delete msgCallbackForBA[i].second;
780         msgCallbackForBA[i].second = nullptr;
781     }
782 
783     // Clean up
784     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
785     g_envDeviceC.commAggrHandle->ReleaseCommunicator(commCA);
786     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
787     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
788     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
789     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
790     TearDownEnv(g_envDeviceC);
791 }
792 
793 /**
794  * @tc.name: ReDeliver Message 003
795  * @tc.desc: For observe memory in unusual scenario
796  * @tc.type: FUNC
797  * @tc.require: AR000DR9KV
798  * @tc.author: xiaozhenjian
799  */
800 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage003, TestSize.Level2)
801 {
802     /**
803      * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
804      */
805     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anon8a05f61d0d02(const LabelType &commLabel, const std::string &userId)806         const std::string &userId)->int {
807         return E_OK;
808     }, nullptr);
809     EXPECT_EQ(errCode, E_OK);
810 
811     /**
812      * @tc.steps: step2. connect device A with device B
813      */
814     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
815     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
816 
817     /**
818      * @tc.steps: step3. device A alloc communicator AA,AB,AC using label A,B,C
819      */
820     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
821     ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
822     ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
823     ASSERT_NOT_NULL_AND_ACTIVATE(commAB);
824     ICommunicator *commAC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_C, errCode);
825     ASSERT_NOT_NULL_AND_ACTIVATE(commAC);
826     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
827 
828     /**
829      * @tc.steps: step4. device A Continuously send tiny message to B using communicator AA,AB,AC
830      */
831     for (int turn = 0; turn < 11; turn++) { // Total 11 turns
832         DO_SEND_MESSAGE(A, B, A, 0);
833         DO_SEND_MESSAGE(A, B, B, 0);
834         DO_SEND_MESSAGE(A, B, C, 0);
835     }
836 
837     /**
838      * @tc.steps: step5. device A Continuously send giant message to B using communicator AA,AB,AC
839      */
840     for (int turn = 0; turn < 5; turn++) { // Total 5 turns
841         DO_SEND_GIANT_MESSAGE(A, B, A, (3 * 1024 * 1024)); // 3 MBytes, 1024 is scale
842         DO_SEND_GIANT_MESSAGE(A, B, B, (6 * 1024 * 1024)); // 6 MBytes, 1024 is scale
843         DO_SEND_GIANT_MESSAGE(A, B, C, (7 * 1024 * 1024)); // 7 MBytes, 1024 is scale
844     }
845     DO_SEND_GIANT_MESSAGE(A, B, A, (30 * 1024 * 1024)); // 30 MBytes, 1024 is scale
846 
847     /**
848      * @tc.steps: step6. wait a long time then send last frame
849      */
850     for (int sec = 0; sec < 15; sec++) { // Total 15 s
851         std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
852         LOGI("[UT][Test][ReDeliverMessage003] Sleep and wait=%d.", sec);
853     }
854     DO_SEND_MESSAGE(A, B, A, 0);
855     std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
856 
857     // Clean up
858     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
859     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
860     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAC);
861     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
862     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
863 }
864 
865 namespace {
SetUpEnv(const std::string & localDev,const std::string & supportDev,const std::shared_ptr<DBStatusAdapter> & adapter,bool isSupport,EnvHandle & envDevice)866     void SetUpEnv(const std::string &localDev, const std::string &supportDev,
867         const std::shared_ptr<DBStatusAdapter> &adapter, bool isSupport, EnvHandle &envDevice)
868     {
869         std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
870         adapter->SetDBInfoHandle(handle);
871         adapter->SetRemoteOptimizeCommunication(supportDev, isSupport);
872         SetUpEnv(envDevice, localDev, adapter);
873     }
874 
InitCommunicator(DBInfo & dbInfo,EnvHandle & envDevice,OnOfflineDevice & onlineCallback,ICommunicator * & comm)875     void InitCommunicator(DBInfo &dbInfo, EnvHandle &envDevice, OnOfflineDevice &onlineCallback, ICommunicator *&comm)
876     {
877         dbInfo.userId = USER_ID;
878         dbInfo.appId = APP_ID;
879         dbInfo.storeId = STORE_ID_1;
880         dbInfo.isNeedSync = true;
881         dbInfo.syncDualTupleMode = false;
882         std::string label = DBCommon::GenerateHashLabel(dbInfo);
883         std::vector<uint8_t> commLabel(label.begin(), label.end());
884         int errorNo = E_OK;
885         comm = envDevice.commAggrHandle->AllocCommunicator(commLabel, errorNo);
886         ASSERT_NOT_NULL_AND_ACTIVATE(comm);
887         REG_CONNECT_CALLBACK(comm, onlineCallback);
888     }
889 }
890 
891 /**
892   * @tc.name: CommunicationOptimization001
893   * @tc.desc: Test notify with isSupport true.
894   * @tc.type: FUNC
895   * @tc.require: AR000HGD0B
896   * @tc.author: zhangqiquan
897   */
898 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization001, TestSize.Level3)
899 {
900     auto *pAggregator = new VirtualCommunicatorAggregator();
901     ASSERT_NE(pAggregator, nullptr);
902     const std::string deviceA = "DEVICES_A";
903     const std::string deviceB = "DEVICES_B";
904     RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
905     /**
906      * @tc.steps: step1. set up env
907      */
908     EnvHandle envDeviceA;
909     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
910     SetUpEnv(deviceA, deviceB, adapterA, true, envDeviceA);
911 
912     EnvHandle envDeviceB;
913     std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
914     SetUpEnv(deviceB, deviceA, adapterB, true, envDeviceB);
915 
916     /**
917      * @tc.steps: step2. device alloc communicator using label and register callback
918      */
919     DBInfo dbInfo;
920     ICommunicator *commAA = nullptr;
921     OnOfflineDevice onlineForAA;
922     InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
923 
924     ICommunicator *commBB = nullptr;
925     OnOfflineDevice onlineForBB;
926     InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
927 
928     /**
929      * @tc.steps: step3. connect device A with device B
930      */
931     AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
932 
933     /**
934      * @tc.steps: step4. wait for label exchange
935      * @tc.expected: step4. both communicator has no callback;
936      */
937     std::this_thread::sleep_for(std::chrono::seconds(1));
938     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
939     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
940 
941     /**
942      * @tc.steps: step5. both notify
943      * @tc.expected: step5. both has callback;
944      */
945     RuntimeConfig::NotifyDBInfos({ deviceB }, { dbInfo });
946     adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
947     adapterB->NotifyDBInfos({ deviceA }, { dbInfo });
948     std::this_thread::sleep_for(std::chrono::seconds(1));
949 
950     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
951 
952     dbInfo.isNeedSync = false;
953     adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
954     adapterB->NotifyDBInfos({ deviceA }, { dbInfo });
955     std::this_thread::sleep_for(std::chrono::seconds(1));
956     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
957 
958     // Clean up and disconnect
959     envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
960     envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
961     std::this_thread::sleep_for(std::chrono::seconds(1));
962 
963     AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
964 
965     TearDownEnv(envDeviceA);
966     TearDownEnv(envDeviceB);
967     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
968 }
969 
970 /**
971   * @tc.name: CommunicationOptimization002
972   * @tc.desc: Test notify with isSupport true and can offline by device change.
973   * @tc.type: FUNC
974   * @tc.require: AR000HGD0B
975   * @tc.author: zhangqiquan
976   */
977 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization002, TestSize.Level3)
978 {
979     auto *pAggregator = new VirtualCommunicatorAggregator();
980     ASSERT_NE(pAggregator, nullptr);
981     const std::string deviceA = "DEVICES_A";
982     const std::string deviceB = "DEVICES_B";
983     RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
984     /**
985      * @tc.steps: step1. set up env
986      */
987     EnvHandle envDeviceA;
988     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
989     SetUpEnv(deviceA, deviceB, adapterA, true, envDeviceA);
990 
991     EnvHandle envDeviceB;
992     std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
993     SetUpEnv(deviceB, deviceA, adapterB, true, envDeviceB);
994 
995     /**
996      * @tc.steps: step2. device alloc communicator using label and register callback
997      */
998     DBInfo dbInfo;
999     ICommunicator *commAA = nullptr;
1000     OnOfflineDevice onlineForAA;
1001     InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1002     /**
1003      * @tc.steps: step3. connect device A with device B
1004      */
1005     AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1006     std::this_thread::sleep_for(std::chrono::seconds(1));
1007 
1008     /**
1009      * @tc.steps: step5. A notify remote
1010      * @tc.expected: step5. A has callback;
1011      */
1012     adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1013     std::this_thread::sleep_for(std::chrono::seconds(1));
1014     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
1015 
1016     AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1017     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
1018 
1019     // Clean up and disconnect
1020     envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1021     std::this_thread::sleep_for(std::chrono::seconds(1));
1022 
1023     TearDownEnv(envDeviceA);
1024     TearDownEnv(envDeviceB);
1025     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1026 }
1027 
1028 /**
1029   * @tc.name: CommunicationOptimization003
1030   * @tc.desc: Test notify with isSupport false.
1031   * @tc.type: FUNC
1032   * @tc.require: AR000HGD0B
1033   * @tc.author: zhangqiquan
1034   */
1035 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization003, TestSize.Level3)
1036 {
1037     auto *pAggregator = new VirtualCommunicatorAggregator();
1038     ASSERT_NE(pAggregator, nullptr);
1039     const std::string deviceA = "DEVICES_A";
1040     const std::string deviceB = "DEVICES_B";
1041     RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
1042     /**
1043      * @tc.steps: step1. set up env
1044      */
1045     EnvHandle envDeviceA;
1046     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1047     SetUpEnv(deviceA, deviceB, adapterA, false, envDeviceA);
1048     EnvHandle envDeviceB;
1049     std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
1050     SetUpEnv(deviceB, deviceA, adapterB, false, envDeviceB);
1051     /**
1052      * @tc.steps: step2. device alloc communicator using label and register callback
1053      */
1054     DBInfo dbInfo;
1055     ICommunicator *commAA = nullptr;
1056     OnOfflineDevice onlineForAA;
1057     InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1058     ICommunicator *commBB = nullptr;
1059     OnOfflineDevice onlineForBB;
1060     InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
1061     /**
1062      * @tc.steps: step3. connect device A with device B
1063      */
1064     AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1065     /**
1066      * @tc.steps: step4. wait for label exchange
1067      * @tc.expected: step4. both communicator has no callback;
1068      */
1069     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
1070     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
1071     /**
1072      * @tc.steps: step5. A notify remote
1073      * @tc.expected: step5. B has no callback;
1074      */
1075     adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1076     std::this_thread::sleep_for(std::chrono::seconds(1));
1077     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
1078     /**
1079      * @tc.steps: step6. A notify local
1080      * @tc.expected: step6. B has no callback;
1081      */
1082     dbInfo.isNeedSync = false;
1083     onlineForAA.onlineDevices.clear();
1084     adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1085     std::this_thread::sleep_for(std::chrono::seconds(1));
1086     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
1087     // Clean up and disconnect
1088     envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1089     envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
1090     std::this_thread::sleep_for(std::chrono::seconds(1));
1091     AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1092     TearDownEnv(envDeviceA);
1093     TearDownEnv(envDeviceB);
1094     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1095 }
1096 
1097 /**
1098   * @tc.name: CommunicationOptimization004
1099   * @tc.desc: Test notify with isSupport false and it be will changed by communication.
1100   * @tc.type: FUNC
1101   * @tc.require: AR000HGD0B
1102   * @tc.author: zhangqiquan
1103   */
1104 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization004, TestSize.Level3)
1105 {
1106     const std::string deviceA = "DEVICES_A";
1107     const std::string deviceB = "DEVICES_B";
1108     /**
1109      * @tc.steps: step1. set up env
1110      */
1111     EnvHandle envDeviceA;
1112     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1113     SetUpEnv(deviceA, deviceB, adapterA, false, envDeviceA);
1114     RuntimeContext::GetInstance()->SetCommunicatorAggregator(envDeviceA.commAggrHandle);
1115     EnvHandle envDeviceB;
1116     std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
1117     SetUpEnv(deviceB, deviceA, adapterB, false, envDeviceB);
1118     /**
1119      * @tc.steps: step2. device alloc communicator using label and register callback
1120      */
1121     DBInfo dbInfo;
1122     ICommunicator *commAA = nullptr;
1123     OnOfflineDevice onlineForAA;
1124     InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1125     ICommunicator *commBB = nullptr;
1126     OnOfflineDevice onlineForBB;
1127     InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
1128     /**
1129      * @tc.steps: step3. connect device A with device B
1130      */
1131     EXPECT_EQ(adapterA->IsSupport(deviceB), false);
1132     AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1133     std::this_thread::sleep_for(std::chrono::seconds(1));
1134     EXPECT_EQ(adapterA->IsSupport(deviceB), true);
1135     // Clean up and disconnect
1136     envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1137     envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
1138     std::this_thread::sleep_for(std::chrono::seconds(1));
1139     AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1140     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1141     envDeviceA.commAggrHandle = nullptr;
1142     TearDownEnv(envDeviceA);
1143     TearDownEnv(envDeviceB);
1144 }
1145 
1146 /**
1147   * @tc.name: CommunicationOptimization005
1148   * @tc.desc: Test notify with isSupport false and send label exchange.
1149   * @tc.type: FUNC
1150   * @tc.require: AR000HGD0B
1151   * @tc.author: zhangqiquan
1152   */
1153 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization005, TestSize.Level3)
1154 {
1155     const std::string deviceA = "DEVICES_A";
1156     const std::string deviceB = "DEVICES_B";
1157     /**
1158      * @tc.steps: step1. set up env
1159      */
1160     EnvHandle envDeviceA;
1161     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1162     std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1163     handle->SetLocalIsSupport(false);
1164     adapterA->SetDBInfoHandle(handle);
1165     SetUpEnv(envDeviceA, deviceA, adapterA);
1166     RuntimeContext::GetInstance()->SetCommunicatorAggregator(envDeviceA.commAggrHandle);
1167     EnvHandle envDeviceB;
1168     SetUpEnv(envDeviceB, deviceB, nullptr);
1169     /**
1170      * @tc.steps: step2. connect device A with device B
1171      */
1172     EXPECT_EQ(adapterA->IsSupport(deviceB), false);
1173     AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1174     std::this_thread::sleep_for(std::chrono::seconds(1));
1175     /**
1176      * @tc.steps: step3. device alloc communicator using label and register callback
1177      */
1178     DBInfo dbInfo;
1179     ICommunicator *commAA = nullptr;
1180     OnOfflineDevice onlineForAA;
1181     InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1182     ICommunicator *commBB = nullptr;
1183     OnOfflineDevice onlineForBB;
1184     InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
1185     std::this_thread::sleep_for(std::chrono::seconds(1));
1186     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
1187 
1188     // Clean up and disconnect
1189     envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1190     envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
1191     std::this_thread::sleep_for(std::chrono::seconds(1));
1192     AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1193     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1194     envDeviceA.commAggrHandle = nullptr;
1195     TearDownEnv(envDeviceA);
1196     TearDownEnv(envDeviceB);
1197 }
1198 
1199 /**
1200   * @tc.name: DbStatusAdapter001
1201   * @tc.desc: Test notify with isSupport false.
1202   * @tc.type: FUNC
1203   * @tc.require: AR000HGD0B
1204   * @tc.author: zhangqiquan
1205   */
1206 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter001, TestSize.Level1)
1207 {
1208     auto *pAggregator = new VirtualCommunicatorAggregator();
1209     ASSERT_NE(pAggregator, nullptr);
1210     const std::string deviceA = "DEVICES_A";
1211     const std::string deviceB = "DEVICES_B";
1212     RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
1213 
1214     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1215     std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1216     adapterA->SetDBInfoHandle(handle);
1217     adapterA->SetRemoteOptimizeCommunication(deviceB, true);
1218     std::string actualRemoteDevInfo;
1219     size_t remoteInfoCount = 0u;
1220     size_t localCount = 0u;
1221     DBInfo dbInfo;
1222     adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1223 
1224     dbInfo = {
1225         USER_ID,
1226         APP_ID,
1227         STORE_ID_1,
1228         true,
1229         false
1230     };
1231     adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1232     dbInfo.isNeedSync = false;
1233     adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1234     adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1235 
1236     size_t remoteChangeCount = 0u;
1237     adapterA->SetDBStatusChangeCallback(
__anon8a05f61d0f02(const std::string &devInfo, const std::vector<DBInfo> &dbInfos) 1238         [&actualRemoteDevInfo, &remoteInfoCount](const std::string &devInfo, const std::vector<DBInfo> &dbInfos) {
1239             actualRemoteDevInfo = devInfo;
1240             remoteInfoCount = dbInfos.size();
1241         },
__anon8a05f61d1002() 1242         [&localCount]() {
1243             localCount++;
1244         },
__anon8a05f61d1102(const std::string &dev) 1245         [&remoteChangeCount, deviceB](const std::string &dev) {
1246             remoteChangeCount++;
1247             EXPECT_EQ(dev, deviceB);
1248         });
1249     std::this_thread::sleep_for(std::chrono::seconds(1));
1250     EXPECT_EQ(actualRemoteDevInfo, deviceB);
1251     EXPECT_EQ(remoteInfoCount, 1u);
1252     adapterA->SetRemoteOptimizeCommunication(deviceB, false);
1253     std::this_thread::sleep_for(std::chrono::seconds(1));
1254     EXPECT_EQ(remoteChangeCount, 1u);
1255     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1256 }
1257 
1258 /**
1259   * @tc.name: DbStatusAdapter002
1260   * @tc.desc: Test adapter clear cache.
1261   * @tc.type: FUNC
1262   * @tc.require: AR000HGD0B
1263   * @tc.author: zhangqiquan
1264   */
1265 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter002, TestSize.Level1) {
1266     const std::string deviceB = "DEVICES_B";
1267     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1268     std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1269     adapterA->SetDBInfoHandle(handle);
1270     adapterA->SetRemoteOptimizeCommunication(deviceB, true);
1271     EXPECT_TRUE(adapterA->IsSupport(deviceB));
1272     adapterA->SetDBInfoHandle(handle);
1273     adapterA->SetRemoteOptimizeCommunication(deviceB, false);
1274     EXPECT_FALSE(adapterA->IsSupport(deviceB));
1275 }
1276 
1277 /**
1278   * @tc.name: DbStatusAdapter003
1279   * @tc.desc: Test adapter get local dbInfo.
1280   * @tc.type: FUNC
1281   * @tc.require: AR000HGD0B
1282   * @tc.author: zhangqiquan
1283   */
1284 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter003, TestSize.Level1) {
1285     const std::string deviceB = "DEVICES_B";
1286     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1287     std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1288     adapterA->SetDBInfoHandle(handle);
1289     handle->SetLocalIsSupport(true);
1290     std::vector<DBInfo> dbInfos;
1291     EXPECT_EQ(adapterA->GetLocalDBInfos(dbInfos), E_OK);
1292     handle->SetLocalIsSupport(false);
1293     EXPECT_EQ(adapterA->GetLocalDBInfos(dbInfos), E_OK);
1294     adapterA->SetDBInfoHandle(handle);
1295     EXPECT_EQ(adapterA->GetLocalDBInfos(dbInfos), -E_NOT_SUPPORT);
1296 }
1297 
1298 /**
1299   * @tc.name: DbStatusAdapter004
1300   * @tc.desc: Test adapter clear cache will get callback.
1301   * @tc.type: FUNC
1302   * @tc.require: AR000HGD0B
1303   * @tc.author: zhangqiquan
1304   */
1305 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter004, TestSize.Level1)
1306 {
1307     const std::string deviceB = "DEVICES_B";
1308     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1309     std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1310     adapterA->SetDBInfoHandle(handle);
1311     handle->SetLocalIsSupport(true);
1312     std::vector<DBInfo> dbInfos;
1313     DBInfo dbInfo = {
1314         USER_ID,
1315         APP_ID,
1316         STORE_ID_1,
1317         true,
1318         true
1319     };
1320     dbInfos.push_back(dbInfo);
1321     dbInfo.storeId = STORE_ID_2;
1322     dbInfos.push_back(dbInfo);
1323     dbInfo.storeId = STORE_ID_3;
1324     dbInfo.isNeedSync = false;
1325     dbInfos.push_back(dbInfo);
1326     adapterA->NotifyDBInfos({"dev"}, dbInfos);
1327     std::this_thread::sleep_for(std::chrono::seconds(1));
1328     size_t notifyCount = 0u;
1329     adapterA->SetDBStatusChangeCallback([&notifyCount](const std::string &devInfo,
__anon8a05f61d1202(const std::string &devInfo, const std::vector<DBInfo> &dbInfos) 1330         const std::vector<DBInfo> &dbInfos) {
1331         LOGD("on callback");
1332         for (const auto &dbInfo: dbInfos) {
1333             EXPECT_FALSE(dbInfo.isNeedSync);
1334         }
1335         notifyCount = dbInfos.size();
1336     }, nullptr, nullptr);
1337     adapterA->SetDBInfoHandle(nullptr);
1338     EXPECT_EQ(notifyCount, 2u); // 2 dbInfo is need sync now it should be offline
1339 }
1340 
1341 /**
1342   * @tc.name: DbStatusAdapter005
1343   * @tc.desc: Test adapter is need auto sync.
1344   * @tc.type: FUNC
1345   * @tc.require: AR000HGD0B
1346   * @tc.author: zhangqiquan
1347   */
1348 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter005, TestSize.Level1)
1349 {
1350     const std::string deviceB = "DEVICES_B";
1351     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1352     std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1353     adapterA->SetDBInfoHandle(handle);
1354     handle->SetLocalIsSupport(true);
1355     EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), true);
1356     handle->SetNeedAutoSync(false);
1357     EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), false);
1358     handle->SetLocalIsSupport(false);
1359     EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), false);
1360     adapterA->SetDBInfoHandle(handle);
1361     EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), true);
1362     adapterA->SetDBInfoHandle(nullptr);
1363     EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), true);
1364 }
1365 }