• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 #include <thread>
18 #include "db_errno.h"
19 #include "distributeddb_communicator_common.h"
20 #include "distributeddb_tools_unit_test.h"
21 #include "endian_convert.h"
22 #include "log_print.h"
23 #include "thread_pool_test_stub.h"
24 
25 using namespace std;
26 using namespace testing::ext;
27 using namespace DistributedDB;
28 
29 namespace {
30     EnvHandle g_envDeviceA;
31     EnvHandle g_envDeviceB;
32     EnvHandle g_envDeviceC;
33 
HandleConnectChange(OnOfflineDevice & onlines,const std::string & target,bool isConnect)34 static void HandleConnectChange(OnOfflineDevice &onlines, const std::string &target, bool isConnect)
35 {
36     if (isConnect) {
37         onlines.onlineDevices.insert(target);
38         onlines.latestOnlineDevice = target;
39         onlines.latestOfflineDevice.clear();
40     } else {
41         onlines.onlineDevices.erase(target);
42         onlines.latestOnlineDevice.clear();
43         onlines.latestOfflineDevice = target;
44     }
45 }
46 
47 class DistributedDBCommunicatorTest : public testing::Test {
48 public:
49     static void SetUpTestCase(void);
50     static void TearDownTestCase(void);
51     void SetUp();
52     void TearDown();
53 };
54 
SetUpTestCase(void)55 void DistributedDBCommunicatorTest::SetUpTestCase(void)
56 {
57     /**
58      * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
59      */
60     LOGI("[UT][Test][SetUpTestCase] Enter.");
61     bool errCode = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
62     ASSERT_EQ(errCode, true);
63     errCode = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
64     ASSERT_EQ(errCode, true);
65     DoRegTransformFunction();
66     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
67 }
68 
TearDownTestCase(void)69 void DistributedDBCommunicatorTest::TearDownTestCase(void)
70 {
71     /**
72      * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
73      */
74     LOGI("[UT][Test][TearDownTestCase] Enter.");
75     std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
76     TearDownEnv(g_envDeviceA);
77     TearDownEnv(g_envDeviceB);
78     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
79 }
80 
SetUp()81 void DistributedDBCommunicatorTest::SetUp()
82 {
83     DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
84 }
85 
TearDown()86 void DistributedDBCommunicatorTest::TearDown()
87 {
88     /**
89      * @tc.teardown: Wait 100 ms to make sure all thread quiet
90      */
91     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
92 }
93 
94 /**
95  * @tc.name: Communicator Management 001
96  * @tc.desc: Test alloc and release communicator
97  * @tc.type: FUNC
98  * @tc.require: AR000BVDGG AR000CQE0L
99  * @tc.author: xiaozhenjian
100  */
101 HWTEST_F(DistributedDBCommunicatorTest, CommunicatorManagement001, TestSize.Level1)
102 {
103     /**
104      * @tc.steps: step1. alloc communicator A using label A
105      * @tc.expected: step1. alloc return OK.
106      */
107     int errorNo = E_OK;
108     ICommunicator *commA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
109     EXPECT_EQ(errorNo, E_OK);
110     EXPECT_NE(commA, nullptr);
111 
112     /**
113      * @tc.steps: step2. alloc communicator B using label B
114      * @tc.expected: step2. alloc return OK.
115      */
116     errorNo = E_OK;
117     ICommunicator *commB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
118     EXPECT_EQ(errorNo, E_OK);
119     EXPECT_NE(commA, nullptr);
120 
121     /**
122      * @tc.steps: step3. alloc communicator C using label A
123      * @tc.expected: step3. alloc return not OK.
124      */
125     errorNo = E_OK;
126     ICommunicator *commC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
127     EXPECT_NE(errorNo, E_OK);
128     EXPECT_EQ(commC, nullptr);
129 
130     /**
131      * @tc.steps: step4. release communicator A and communicator B
132      */
133     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commA);
134     commA = nullptr;
135     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commB);
136     commB = nullptr;
137 
138     /**
139      * @tc.steps: step5. alloc communicator D using label A
140      * @tc.expected: step5. alloc return OK.
141      */
142     errorNo = E_OK;
143     ICommunicator *commD = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
144     EXPECT_EQ(errorNo, E_OK);
145     EXPECT_NE(commD, nullptr);
146 
147     /**
148      * @tc.steps: step6. release communicator D
149      */
150     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commD);
151     commD = nullptr;
152 }
153 
ConnectWaitDisconnect()154 static void ConnectWaitDisconnect()
155 {
156     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
157     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
158     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
159 }
160 
161 /**
162  * @tc.name: Online And Offline 001
163  * @tc.desc: Test functionality triggered by physical devices online and offline
164  * @tc.type: FUNC
165  * @tc.require: AR000BVRNS AR000CQE0H
166  * @tc.author: wudongxing
167  */
168 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline001, TestSize.Level1)
169 {
170     /**
171      * @tc.steps: step1. device A alloc communicator AA using label A and register callback
172      * @tc.expected: step1. no callback.
173      */
174     int errorNo = E_OK;
175     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
176     ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
177     OnOfflineDevice onlineForAA;
__anone94f6f3c0202(const std::string &target, bool isConnect) 178     commAA->RegOnConnectCallback([&onlineForAA](const std::string &target, bool isConnect) {
179         HandleConnectChange(onlineForAA, target, isConnect);}, nullptr);
180     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
181 
182     /**
183      * @tc.steps: step2. connect device A with device B and then disconnect
184      * @tc.expected: step2. no callback.
185      */
186     ConnectWaitDisconnect();
187     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
188 
189     /**
190      * @tc.steps: step3. device B alloc communicator BB using label B and register callback
191      * @tc.expected: step3. no callback.
192      */
193     ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
194     ASSERT_NOT_NULL_AND_ACTIVATE(commBB);
195     OnOfflineDevice onlineForBB;
__anone94f6f3c0302(const std::string &target, bool isConnect) 196     commBB->RegOnConnectCallback([&onlineForBB](const std::string &target, bool isConnect) {
197         HandleConnectChange(onlineForBB, target, isConnect);}, nullptr);
198     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
199     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
200 
201     /**
202      * @tc.steps: step4. connect device A with device B and then disconnect
203      * @tc.expected: step4. no callback.
204      */
205     ConnectWaitDisconnect();
206     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
207     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
208 
209     /**
210      * @tc.steps: step5. device B alloc communicator BA using label A and register callback
211      * @tc.expected: step5. no callback.
212      */
213     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
214     ASSERT_NOT_NULL_AND_ACTIVATE(commBA);
215     OnOfflineDevice onlineForBA;
__anone94f6f3c0402(const std::string &target, bool isConnect) 216     commBA->RegOnConnectCallback([&onlineForBA](const std::string &target, bool isConnect) {
217         HandleConnectChange(onlineForBA, target, isConnect);}, nullptr);
218     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
219     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
220     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
221 
222     /**
223      * @tc.steps: step6. connect device A with device B
224      * @tc.expected: step6. communicator AA has callback of device B online;
225      *                      communicator BA has callback of device A online;
226      *                      communicator BB no callback
227      */
228     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
229     std::this_thread::sleep_for(std::chrono::milliseconds(100));
230     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
231     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
232     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
233     EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
234     EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
235 
236     /**
237      * @tc.steps: step7. disconnect device A with device B
238      * @tc.expected: step7. communicator AA has callback of device B offline;
239      *                      communicator BA has callback of device A offline;
240      *                      communicator BB no callback
241      */
242     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
243     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
244     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
245     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
246     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
247     EXPECT_EQ(onlineForAA.latestOfflineDevice, DEVICE_NAME_B);
248     EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
249 
250     // Clean up
251     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
252     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
253     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
254 }
255 
256 #define REG_CONNECT_CALLBACK(communicator, online) \
257 { \
258     communicator->RegOnConnectCallback([&online](const std::string &target, bool isConnect) { \
259         HandleConnectChange(online, target, isConnect); \
260     }, nullptr); \
261 }
262 
263 #define CONNECT_AND_WAIT(waitTime) \
264 { \
265     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle); \
266     std::this_thread::sleep_for(std::chrono::milliseconds(waitTime)); \
267 }
268 
269 /**
270  * @tc.name: Online And Offline 002
271  * @tc.desc: Test functionality triggered by alloc and release communicator
272  * @tc.type: FUNC
273  * @tc.require: AR000BVRNT AR000CQE0I
274  * @tc.author: wudongxing
275  */
276 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline002, TestSize.Level1)
277 {
278     /**
279      * @tc.steps: step1. connect device A with device B
280      */
281     CONNECT_AND_WAIT(200); // Sleep 200 ms
282 
283     /**
284      * @tc.steps: step2. device A alloc communicator AA using label A and register callback
285      * @tc.expected: step2. no callback.
286      */
287     int errorNo = E_OK;
288     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
289     ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
290     OnOfflineDevice onlineForAA;
291     REG_CONNECT_CALLBACK(commAA, onlineForAA);
292     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
293 
294     /**
295      * @tc.steps: step3. device B alloc communicator BB using label B and register callback
296      * @tc.expected: step3. no callback.
297      */
298     ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
299     ASSERT_NOT_NULL_AND_ACTIVATE(commBB);
300     OnOfflineDevice onlineForBB;
301     REG_CONNECT_CALLBACK(commBB, onlineForBB);
302     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
303     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
304 
305     /**
306      * @tc.steps: step4. device B alloc communicator BA using label A and register callback
307      * @tc.expected: step4. communicator AA has callback of device B online;
308      *                      communicator BA has callback of device A online;
309      *                      communicator BB no callback.
310      */
311     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
312     ASSERT_NOT_NULL_AND_ACTIVATE(commBA);
313     OnOfflineDevice onlineForBA;
314     REG_CONNECT_CALLBACK(commBA, onlineForBA);
315     std::this_thread::sleep_for(std::chrono::milliseconds(100));
316     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
317     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
318     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
319     EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
320     EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
321 
322     /**
323      * @tc.steps: step5. device A alloc communicator AB using label B and register callback
324      * @tc.expected: step5. communicator AB has callback of device B online;
325      *                      communicator BB has callback of device A online;
326      */
327     ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
328     ASSERT_NOT_NULL_AND_ACTIVATE(commAB);
329     OnOfflineDevice onlineForAB;
330     REG_CONNECT_CALLBACK(commAB, onlineForAB);
331     std::this_thread::sleep_for(std::chrono::milliseconds(100));
332     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
333     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
334     EXPECT_EQ(onlineForAB.latestOnlineDevice, DEVICE_NAME_B);
335     EXPECT_EQ(onlineForBB.latestOnlineDevice, DEVICE_NAME_A);
336 
337     /**
338      * @tc.steps: step6. device A release communicator AA
339      * @tc.expected: step6. communicator BA has callback of device A offline;
340      *                      communicator AB and BB no callback;
341      */
342     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
343     std::this_thread::sleep_for(std::chrono::milliseconds(100));
344     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
345     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
346     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
347     EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
348 
349     /**
350      * @tc.steps: step7. device B release communicator BA
351      * @tc.expected: step7. communicator AB and BB no callback;
352      */
353     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
354     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
355     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
356 
357     /**
358      * @tc.steps: step8. device B release communicator BB
359      * @tc.expected: step8. communicator AB has callback of device B offline;
360      */
361     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
362     std::this_thread::sleep_for(std::chrono::milliseconds(100));
363     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(0));
364     EXPECT_EQ(onlineForAB.latestOfflineDevice, DEVICE_NAME_B);
365 
366     // Clean up
367     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
368     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
369 }
370 
TestRemoteRestart()371 void TestRemoteRestart()
372 {
373     /**
374      * @tc.steps: step1. connect device D with device E
375      */
376     EnvHandle envDeviceD;
377     EnvHandle envDeviceE;
378     SetUpEnv(envDeviceD, "DEVICE_D");
379     SetUpEnv(envDeviceE, "DEVICE_E");
380 
381     /**
382      * @tc.steps: step2. device D alloc communicator DD using label A and register callback
383      */
384     int errorNo = E_OK;
385     ICommunicator *commDD = envDeviceD.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
386     ASSERT_NOT_NULL_AND_ACTIVATE(commDD);
387     OnOfflineDevice onlineForDD;
388     REG_CONNECT_CALLBACK(commDD, onlineForDD);
389 
390     /**
391      * @tc.steps: step3. device E alloc communicator EE using label A and register callback
392      */
393     ICommunicator *commEE = envDeviceE.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
394     ASSERT_NOT_NULL_AND_ACTIVATE(commEE);
395     OnOfflineDevice onlineForEE;
396     REG_CONNECT_CALLBACK(commEE, onlineForEE);
397     /**
398      * @tc.steps: step4. deviceD connect to deviceE
399      * @tc.expected: step4. both communicator has callback;
400      */
401     AdapterStub::ConnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
402     std::this_thread::sleep_for(std::chrono::seconds(1));
403     EXPECT_EQ(onlineForDD.onlineDevices.size(), static_cast<size_t>(1));
404     EXPECT_EQ(onlineForEE.onlineDevices.size(), static_cast<size_t>(1));
405 
406     /**
407      * @tc.steps: step5. device E restart
408      */
409     envDeviceE.commAggrHandle->ReleaseCommunicator(commEE);
410     std::this_thread::sleep_for(std::chrono::seconds(1));
411     TearDownEnv(envDeviceE);
412     SetUpEnv(envDeviceE, "DEVICE_E");
413 
414     commEE = envDeviceE.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
415     ASSERT_NOT_NULL_AND_ACTIVATE(commEE);
416     REG_CONNECT_CALLBACK(commEE, onlineForEE);
417     onlineForEE.onlineDevices.clear();
418     /**
419      * @tc.steps: step6. deviceD connect to deviceE again
420      * @tc.expected: step6. communicatorE has callback;
421      */
422     AdapterStub::ConnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
423     std::this_thread::sleep_for(std::chrono::seconds(1));
424     EXPECT_EQ(onlineForEE.onlineDevices.size(), static_cast<size_t>(1));
425     // Clean up and disconnect
426     envDeviceD.commAggrHandle->ReleaseCommunicator(commDD);
427     envDeviceE.commAggrHandle->ReleaseCommunicator(commEE);
428     std::this_thread::sleep_for(std::chrono::seconds(1));
429 
430     AdapterStub::DisconnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
431     TearDownEnv(envDeviceD);
432     TearDownEnv(envDeviceE);
433 }
434 /**
435  * @tc.name: Online And Offline 003
436  * @tc.desc: Test functionality triggered by remote restart
437  * @tc.type: FUNC
438  * @tc.require: AR000CQE0I
439  * @tc.author: zhangqiquan
440  */
441 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline003, TestSize.Level1)
442 {
443     TestRemoteRestart();
444 }
445 
446 /**
447  * @tc.name: Online And Offline 004
448  * @tc.desc: Test functionality triggered by remote restart with thread pool
449  * @tc.type: FUNC
450  * @tc.require: AR000CQE0I
451  * @tc.author: zhangqiquan
452  */
453 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline004, TestSize.Level1)
454 {
455     auto threadPool = std::make_shared<ThreadPoolTestStub>();
456     RuntimeContext::GetInstance()->SetThreadPool(threadPool);
457     TestRemoteRestart();
458     RuntimeContext::GetInstance()->SetThreadPool(nullptr);
459 }
460 
461 /**
462  * @tc.name: Report Device Connect Change 001
463  * @tc.desc: Test CommunicatorAggregator support report device connect change event
464  * @tc.type: FUNC
465  * @tc.require: AR000DR9KV
466  * @tc.author: xiaozhenjian
467  */
468 HWTEST_F(DistributedDBCommunicatorTest, ReportDeviceConnectChange001, TestSize.Level1)
469 {
470     /**
471      * @tc.steps: step1. device A and device B register connect callback to CommunicatorAggregator
472      */
473     OnOfflineDevice onlineForA;
474     int errCode = g_envDeviceA.commAggrHandle->RegOnConnectCallback(
__anone94f6f3c0502(const std::string &target, bool isConnect) 475         [&onlineForA](const std::string &target, bool isConnect) {
476             HandleConnectChange(onlineForA, target, isConnect);
477         }, nullptr);
478     EXPECT_EQ(errCode, E_OK);
479     OnOfflineDevice onlineForB;
480     errCode = g_envDeviceB.commAggrHandle->RegOnConnectCallback(
__anone94f6f3c0602(const std::string &target, bool isConnect) 481         [&onlineForB](const std::string &target, bool isConnect) {
482             HandleConnectChange(onlineForB, target, isConnect);
483         }, nullptr);
484     EXPECT_EQ(errCode, E_OK);
485 
486     /**
487      * @tc.steps: step2. connect device A with device B
488      * @tc.expected: step2. device A callback B online; device B callback A online;
489      */
490     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
491     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
492     EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(1));
493     EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(1));
494     EXPECT_EQ(onlineForA.latestOnlineDevice, DEVICE_NAME_B);
495     EXPECT_EQ(onlineForB.latestOnlineDevice, DEVICE_NAME_A);
496 
497     /**
498      * @tc.steps: step3. connect device A with device B
499      * @tc.expected: step3. device A callback B offline; device B callback A offline;
500      */
501     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
502     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
503     EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(0));
504     EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(0));
505     EXPECT_EQ(onlineForA.latestOfflineDevice, DEVICE_NAME_B);
506     EXPECT_EQ(onlineForB.latestOfflineDevice, DEVICE_NAME_A);
507 
508     // Clean up
509     g_envDeviceA.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
510     g_envDeviceB.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
511 }
512 
513 namespace {
ToLabelType(uint64_t commLabel)514 LabelType ToLabelType(uint64_t commLabel)
515 {
516     uint64_t netOrderLabel = HostToNet(commLabel);
517     uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
518     std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
519     for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
520         realLabel[i] = eachByte[i];
521     }
522     return realLabel;
523 }
524 }
525 
526 /**
527  * @tc.name: Report Communicator Not Found 001
528  * @tc.desc: Test CommunicatorAggregator support report communicator not found event
529  * @tc.type: FUNC
530  * @tc.require: AR000DR9KV
531  * @tc.author: xiaozhenjian
532  */
533 HWTEST_F(DistributedDBCommunicatorTest, ReportCommunicatorNotFound001, TestSize.Level1)
534 {
535     /**
536      * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
537      */
538     std::vector<LabelType> lackLabels;
539     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anone94f6f3c0802(const LabelType &commLabel, const std::string &userId)540         [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
541             lackLabels.push_back(commLabel);
542             return -E_NOT_FOUND;
543         }, nullptr);
544     EXPECT_EQ(errCode, E_OK);
545 
546     /**
547      * @tc.steps: step2. connect device A with device B
548      */
549     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
550     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
551 
552     /**
553      * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
554      * @tc.expected: step3. device B callback that label A not found.
555      */
556     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
557     ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
558     Message *msgForAA = BuildRegedTinyMessage();
559     ASSERT_NE(msgForAA, nullptr);
560     SendConfig conf = {true, false, 0};
561     errCode = commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
562     EXPECT_EQ(errCode, E_OK);
563     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
564     ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
565     EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
566 
567     /**
568      * @tc.steps: step4. device B alloc communicator BA using label A and register message callback
569      * @tc.expected: step4. communicator BA will not receive message.
570      */
571     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
572     ASSERT_NE(commBA, nullptr);
573     Message *recvMsgForBA = nullptr;
__anone94f6f3c0902(const std::string &srcTarget, Message *inMsg) 574     commBA->RegOnMessageCallback([&recvMsgForBA](const std::string &srcTarget, Message *inMsg) {
575         recvMsgForBA = inMsg;
576     }, nullptr);
577     commBA->Activate();
578     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
579     EXPECT_EQ(recvMsgForBA, nullptr);
580 
581     // Clean up
582     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
583     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
584     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
585     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
586 }
587 
588 #define DO_SEND_MESSAGE(src, dst, label, session) \
589 { \
590     Message *msgFor##src##label = BuildRegedTinyMessage(); \
591     ASSERT_NE(msgFor##src##label, nullptr); \
592     msgFor##src##label->SetSessionId(session); \
593     SendConfig conf = {true, false, 0}; \
594     errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
595     EXPECT_EQ(errCode, E_OK); \
596 }
597 
598 #define DO_SEND_GIANT_MESSAGE(src, dst, label, size) \
599 { \
600     Message *msgFor##src##label = BuildRegedGiantMessage(size); \
601     ASSERT_NE(msgFor##src##label, nullptr); \
602     SendConfig conf = {false, false, 0}; \
603     errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
604     EXPECT_EQ(errCode, E_OK); \
605 }
606 
607 #define ALLOC_AND_SEND_MESSAGE(src, dst, label, session) \
608     ICommunicator *comm##src##label = g_envDevice##src.commAggrHandle->AllocCommunicator(LABEL_##label, errCode); \
609     ASSERT_NOT_NULL_AND_ACTIVATE(comm##src##label); \
610     DO_SEND_MESSAGE(src, dst, label, session)
611 
612 #define REG_MESSAGE_CALLBACK(src, label) \
613     string srcTargetFor##src##label; \
614     Message *recvMsgFor##src##label = nullptr; \
615     comm##src##label->RegOnMessageCallback( \
616         [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, Message *inMsg) { \
617         srcTargetFor##src##label = srcTarget; \
618         recvMsgFor##src##label = inMsg; \
619     }, nullptr);
620 
621 /**
622  * @tc.name: ReDeliver Message 001
623  * @tc.desc: Test CommunicatorAggregator support redeliver message
624  * @tc.type: FUNC
625  * @tc.require: AR000DR9KV
626  * @tc.author: xiaozhenjian
627  */
628 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage001, TestSize.Level1)
629 {
630     /**
631      * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
632      */
633     std::vector<LabelType> lackLabels;
634     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anone94f6f3c0a02(const LabelType &commLabel, const std::string &userId)635         [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
636             lackLabels.push_back(commLabel);
637             return E_OK;
638         }, nullptr);
639     EXPECT_EQ(errCode, E_OK);
640 
641     /**
642      * @tc.steps: step2. connect device A with device B
643      */
644     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
645     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
646 
647     /**
648      * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
649      * @tc.expected: step3. device B callback that label A not found.
650      */
651     ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
652     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
653     ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
654     EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
655 
656     /**
657      * @tc.steps: step4. device A alloc communicator AB using label B and send message to B
658      * @tc.expected: step4. device B callback that label B not found.
659      */
660     ALLOC_AND_SEND_MESSAGE(A, B, B, 200); // session id 200
661     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
662     ASSERT_EQ(lackLabels.size(), static_cast<size_t>(2));
663     EXPECT_EQ(lackLabels[1], ToLabelType(LABEL_B)); // 1 for second element
664 
665     /**
666      * @tc.steps: step5. device B alloc communicator BA using label A and register message callback
667      * @tc.expected: step5. communicator BA will receive message.
668      */
669     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
670     ASSERT_NE(commBA, nullptr);
671     REG_MESSAGE_CALLBACK(B, A);
672     commBA->Activate();
673     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
674     EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
675     ASSERT_NE(recvMsgForBA, nullptr);
676     EXPECT_EQ(recvMsgForBA->GetSessionId(), 100U); // session id 100
677     delete recvMsgForBA;
678     recvMsgForBA = nullptr;
679 
680     /**
681      * @tc.steps: step6. device B alloc communicator BB using label B and register message callback
682      * @tc.expected: step6. communicator BB will receive message.
683      */
684     ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
685     ASSERT_NE(commBB, nullptr);
686     REG_MESSAGE_CALLBACK(B, B);
687     commBB->Activate();
688     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
689     EXPECT_EQ(srcTargetForBB, DEVICE_NAME_A);
690     ASSERT_NE(recvMsgForBB, nullptr);
691     EXPECT_EQ(recvMsgForBB->GetSessionId(), 200U); // session id 200
692     delete recvMsgForBB;
693     recvMsgForBB = nullptr;
694 
695     // Clean up
696     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
697     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
698     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
699     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
700     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
701     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
702 }
703 
704 /**
705  * @tc.name: ReDeliver Message 002
706  * @tc.desc: Test CommunicatorAggregator support redeliver message by order
707  * @tc.type: FUNC
708  * @tc.require: AR000DR9KV
709  * @tc.author: xiaozhenjian
710  */
711 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage002, TestSize.Level1)
712 {
713     /**
714      * @tc.steps: step1. device C create CommunicatorAggregator and initialize
715      */
716     bool step1 = SetUpEnv(g_envDeviceC, DEVICE_NAME_C);
717     ASSERT_EQ(step1, true);
718 
719     /**
720      * @tc.steps: step2. device B register communicator not found callback to CommunicatorAggregator
721      */
722     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anone94f6f3c0b02(const LabelType &commLabel, const std::string &userId)723         const std::string &userId)->int {
724         return E_OK;
725     }, nullptr);
726     EXPECT_EQ(errCode, E_OK);
727 
728     /**
729      * @tc.steps: step3. connect device A with device B, then device B with device C
730      */
731     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
732     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
733     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
734 
735     /**
736      * @tc.steps: step4. device A alloc communicator AA using label A and send message to B
737      */
738     ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
739     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
740 
741     /**
742      * @tc.steps: step5. device C alloc communicator CA using label A and send message to B
743      */
744     ALLOC_AND_SEND_MESSAGE(C, B, A, 200); // session id 200
745     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
746     DO_SEND_MESSAGE(A, B, A, 300); // session id 300
747     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
748     DO_SEND_MESSAGE(C, B, A, 400); // session id 400
749     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
750 
751     /**
752      * @tc.steps: step6. device B alloc communicator BA using label A and register message callback
753      * @tc.expected: step6. communicator BA will receive message in order of sessionid 100, 200, 300, 400.
754      */
755     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
756     ASSERT_NE(commBA, nullptr);
757     std::vector<std::pair<std::string, Message *>> msgCallbackForBA;
__anone94f6f3c0c02(const std::string &srcTarget, Message *inMsg) 758     commBA->RegOnMessageCallback([&msgCallbackForBA](const std::string &srcTarget, Message *inMsg) {
759         msgCallbackForBA.push_back({srcTarget, inMsg});
760     }, nullptr);
761     commBA->Activate();
762     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
763     ASSERT_EQ(msgCallbackForBA.size(), static_cast<size_t>(4)); // total 4 callback
764     EXPECT_EQ(msgCallbackForBA[0].first, DEVICE_NAME_A); // the 0 order element
765     EXPECT_EQ(msgCallbackForBA[1].first, DEVICE_NAME_C); // the 1 order element
766     EXPECT_EQ(msgCallbackForBA[2].first, DEVICE_NAME_A); // the 2 order element
767     EXPECT_EQ(msgCallbackForBA[3].first, DEVICE_NAME_C); // the 3 order element
768     for (uint32_t i = 0; i < msgCallbackForBA.size(); i++) {
769         EXPECT_EQ(msgCallbackForBA[i].second->GetSessionId(), static_cast<uint32_t>((i + 1) * 100)); // 1 sessionid 100
770         delete msgCallbackForBA[i].second;
771         msgCallbackForBA[i].second = nullptr;
772     }
773 
774     // Clean up
775     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
776     g_envDeviceC.commAggrHandle->ReleaseCommunicator(commCA);
777     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
778     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
779     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
780     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
781     TearDownEnv(g_envDeviceC);
782 }
783 
784 /**
785  * @tc.name: ReDeliver Message 003
786  * @tc.desc: For observe memory in unusual scenario
787  * @tc.type: FUNC
788  * @tc.require: AR000DR9KV
789  * @tc.author: xiaozhenjian
790  */
791 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage003, TestSize.Level2)
792 {
793     /**
794      * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
795      */
796     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anone94f6f3c0d02(const LabelType &commLabel, const std::string &userId)797         const std::string &userId)->int {
798         return E_OK;
799     }, nullptr);
800     EXPECT_EQ(errCode, E_OK);
801 
802     /**
803      * @tc.steps: step2. connect device A with device B
804      */
805     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
806     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
807 
808     /**
809      * @tc.steps: step3. device A alloc communicator AA,AB,AC using label A,B,C
810      */
811     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
812     ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
813     ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
814     ASSERT_NOT_NULL_AND_ACTIVATE(commAB);
815     ICommunicator *commAC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_C, errCode);
816     ASSERT_NOT_NULL_AND_ACTIVATE(commAC);
817     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
818 
819     /**
820      * @tc.steps: step4. device A Continuously send tiny message to B using communicator AA,AB,AC
821      */
822     for (int turn = 0; turn < 11; turn++) { // Total 11 turns
823         DO_SEND_MESSAGE(A, B, A, 0);
824         DO_SEND_MESSAGE(A, B, B, 0);
825         DO_SEND_MESSAGE(A, B, C, 0);
826     }
827 
828     /**
829      * @tc.steps: step5. device A Continuously send giant message to B using communicator AA,AB,AC
830      */
831     for (int turn = 0; turn < 5; turn++) { // Total 5 turns
832         DO_SEND_GIANT_MESSAGE(A, B, A, (3 * 1024 * 1024)); // 3 MBytes, 1024 is scale
833         DO_SEND_GIANT_MESSAGE(A, B, B, (6 * 1024 * 1024)); // 6 MBytes, 1024 is scale
834         DO_SEND_GIANT_MESSAGE(A, B, C, (7 * 1024 * 1024)); // 7 MBytes, 1024 is scale
835     }
836     DO_SEND_GIANT_MESSAGE(A, B, A, (30 * 1024 * 1024)); // 30 MBytes, 1024 is scale
837 
838     /**
839      * @tc.steps: step6. wait a long time then send last frame
840      */
841     for (int sec = 0; sec < 15; sec++) { // Total 15 s
842         std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
843         LOGI("[UT][Test][ReDeliverMessage003] Sleep and wait=%d.", sec);
844     }
845     DO_SEND_MESSAGE(A, B, A, 0);
846     std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
847 
848     // Clean up
849     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
850     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
851     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAC);
852     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
853     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
854 }
855 }