• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 #include <thread>
18 #include "db_errno.h"
19 #include "distributeddb_communicator_common.h"
20 #include "distributeddb_tools_unit_test.h"
21 #include "endian_convert.h"
22 #include "log_print.h"
23 
24 using namespace std;
25 using namespace testing::ext;
26 using namespace DistributedDB;
27 
28 namespace {
29     EnvHandle g_envDeviceA;
30     EnvHandle g_envDeviceB;
31     EnvHandle g_envDeviceC;
32 }
33 
HandleConnectChange(OnOfflineDevice & onlines,const std::string & target,bool isConnect)34 static void HandleConnectChange(OnOfflineDevice &onlines, const std::string &target, bool isConnect)
35 {
36     if (isConnect) {
37         onlines.onlineDevices.insert(target);
38         onlines.latestOnlineDevice = target;
39         onlines.latestOfflineDevice.clear();
40     } else {
41         onlines.onlineDevices.erase(target);
42         onlines.latestOnlineDevice.clear();
43         onlines.latestOfflineDevice = target;
44     }
45 }
46 
47 class DistributedDBCommunicatorTest : public testing::Test {
48 public:
49     static void SetUpTestCase(void);
50     static void TearDownTestCase(void);
51     void SetUp();
52     void TearDown();
53 };
54 
SetUpTestCase(void)55 void DistributedDBCommunicatorTest::SetUpTestCase(void)
56 {
57     /**
58      * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
59      */
60     LOGI("[UT][Test][SetUpTestCase] Enter.");
61     bool errCode = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
62     ASSERT_EQ(errCode, true);
63     errCode = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
64     ASSERT_EQ(errCode, true);
65     DoRegTransformFunction();
66     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
67 }
68 
TearDownTestCase(void)69 void DistributedDBCommunicatorTest::TearDownTestCase(void)
70 {
71     /**
72      * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
73      */
74     LOGI("[UT][Test][TearDownTestCase] Enter.");
75     std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
76     TearDownEnv(g_envDeviceA);
77     TearDownEnv(g_envDeviceB);
78     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
79 }
80 
SetUp()81 void DistributedDBCommunicatorTest::SetUp()
82 {
83     DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
84 }
85 
TearDown()86 void DistributedDBCommunicatorTest::TearDown()
87 {
88     /**
89      * @tc.teardown: Wait 100 ms to make sure all thread quiet
90      */
91     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
92 }
93 
94 /**
95  * @tc.name: Communicator Management 001
96  * @tc.desc: Test alloc and release communicator
97  * @tc.type: FUNC
98  * @tc.require: AR000BVDGG AR000CQE0L
99  * @tc.author: xiaozhenjian
100  */
101 HWTEST_F(DistributedDBCommunicatorTest, CommunicatorManagement001, TestSize.Level1)
102 {
103     /**
104      * @tc.steps: step1. alloc communicator A using label A
105      * @tc.expected: step1. alloc return OK.
106      */
107     int errorNo = E_OK;
108     ICommunicator *commA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
109     EXPECT_EQ(errorNo, E_OK);
110     EXPECT_NE(commA, nullptr);
111 
112     /**
113      * @tc.steps: step2. alloc communicator B using label B
114      * @tc.expected: step2. alloc return OK.
115      */
116     errorNo = E_OK;
117     ICommunicator *commB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
118     EXPECT_EQ(errorNo, E_OK);
119     EXPECT_NE(commA, nullptr);
120 
121     /**
122      * @tc.steps: step3. alloc communicator C using label A
123      * @tc.expected: step3. alloc return not OK.
124      */
125     errorNo = E_OK;
126     ICommunicator *commC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
127     EXPECT_NE(errorNo, E_OK);
128     EXPECT_EQ(commC, nullptr);
129 
130     /**
131      * @tc.steps: step4. release communicator A and communicator B
132      */
133     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commA);
134     commA = nullptr;
135     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commB);
136     commB = nullptr;
137 
138     /**
139      * @tc.steps: step5. alloc communicator D using label A
140      * @tc.expected: step5. alloc return OK.
141      */
142     errorNo = E_OK;
143     ICommunicator *commD = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
144     EXPECT_EQ(errorNo, E_OK);
145     EXPECT_NE(commD, nullptr);
146 
147     /**
148      * @tc.steps: step6. release communicator D
149      */
150     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commD);
151     commD = nullptr;
152 }
153 
ConnectWaitDisconnect()154 static void ConnectWaitDisconnect()
155 {
156     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
157     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
158     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
159 }
160 
161 /**
162  * @tc.name: Online And Offline 001
163  * @tc.desc: Test functionality triggered by physical devices online and offline
164  * @tc.type: FUNC
165  * @tc.require: AR000BVRNS AR000CQE0H
166  * @tc.author: wudongxing
167  */
168 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline001, TestSize.Level1)
169 {
170     /**
171      * @tc.steps: step1. device A alloc communicator AA using label A and register callback
172      * @tc.expected: step1. no callback.
173      */
174     int errorNo = E_OK;
175     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
176     ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
177     OnOfflineDevice onlineForAA;
__anone4839d9d0202(const std::string &target, bool isConnect) 178     commAA->RegOnConnectCallback([&onlineForAA](const std::string &target, bool isConnect) {
179         HandleConnectChange(onlineForAA, target, isConnect);}, nullptr);
180     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
181 
182     /**
183      * @tc.steps: step2. connect device A with device B and then disconnect
184      * @tc.expected: step2. no callback.
185      */
186     ConnectWaitDisconnect();
187     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
188 
189     /**
190      * @tc.steps: step3. device B alloc communicator BB using label B and register callback
191      * @tc.expected: step3. no callback.
192      */
193     ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
194     ASSERT_NOT_NULL_AND_ACTIVATE(commBB);
195     OnOfflineDevice onlineForBB;
__anone4839d9d0302(const std::string &target, bool isConnect) 196     commBB->RegOnConnectCallback([&onlineForBB](const std::string &target, bool isConnect) {
197         HandleConnectChange(onlineForBB, target, isConnect);}, nullptr);
198     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
199     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
200 
201     /**
202      * @tc.steps: step4. connect device A with device B and then disconnect
203      * @tc.expected: step4. no callback.
204      */
205     ConnectWaitDisconnect();
206     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
207     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
208 
209     /**
210      * @tc.steps: step5. device B alloc communicator BA using label A and register callback
211      * @tc.expected: step5. no callback.
212      */
213     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
214     ASSERT_NOT_NULL_AND_ACTIVATE(commBA);
215     OnOfflineDevice onlineForBA;
__anone4839d9d0402(const std::string &target, bool isConnect) 216     commBA->RegOnConnectCallback([&onlineForBA](const std::string &target, bool isConnect) {
217         HandleConnectChange(onlineForBA, target, isConnect);}, nullptr);
218     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
219     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
220     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
221 
222     /**
223      * @tc.steps: step6. connect device A with device B
224      * @tc.expected: step6. communicator AA has callback of device B online;
225      *                      communicator BA has callback of device A online;
226      *                      communicator BB no callback
227      */
228     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
229     std::this_thread::sleep_for(std::chrono::milliseconds(100));
230     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
231     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
232     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
233     EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
234     EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
235 
236     /**
237      * @tc.steps: step7. disconnect device A with device B
238      * @tc.expected: step7. communicator AA has callback of device B offline;
239      *                      communicator BA has callback of device A offline;
240      *                      communicator BB no callback
241      */
242     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
243     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
244     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
245     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
246     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
247     EXPECT_EQ(onlineForAA.latestOfflineDevice, DEVICE_NAME_B);
248     EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
249 
250     // Clean up
251     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
252     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
253     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
254 }
255 
256 #define REG_CONNECT_CALLBACK(communicator, online) \
257 { \
258     communicator->RegOnConnectCallback([&online](const std::string &target, bool isConnect) { \
259         HandleConnectChange(online, target, isConnect); \
260     }, nullptr); \
261 }
262 
263 #define CONNECT_AND_WAIT(waitTime) \
264 { \
265     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle); \
266     std::this_thread::sleep_for(std::chrono::milliseconds(waitTime)); \
267 }
268 
269 /**
270  * @tc.name: Online And Offline 002
271  * @tc.desc: Test functionality triggered by alloc and release communicator
272  * @tc.type: FUNC
273  * @tc.require: AR000BVRNT AR000CQE0I
274  * @tc.author: wudongxing
275  */
276 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline002, TestSize.Level1)
277 {
278     /**
279      * @tc.steps: step1. connect device A with device B
280      */
281     CONNECT_AND_WAIT(200); // Sleep 200 ms
282 
283     /**
284      * @tc.steps: step2. device A alloc communicator AA using label A and register callback
285      * @tc.expected: step2. no callback.
286      */
287     int errorNo = E_OK;
288     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
289     ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
290     OnOfflineDevice onlineForAA;
291     REG_CONNECT_CALLBACK(commAA, onlineForAA);
292     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
293 
294     /**
295      * @tc.steps: step3. device B alloc communicator BB using label B and register callback
296      * @tc.expected: step3. no callback.
297      */
298     ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
299     ASSERT_NOT_NULL_AND_ACTIVATE(commBB);
300     OnOfflineDevice onlineForBB;
301     REG_CONNECT_CALLBACK(commBB, onlineForBB);
302     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
303     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
304 
305     /**
306      * @tc.steps: step4. device B alloc communicator BA using label A and register callback
307      * @tc.expected: step4. communicator AA has callback of device B online;
308      *                      communicator BA has callback of device A online;
309      *                      communicator BB no callback.
310      */
311     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
312     ASSERT_NOT_NULL_AND_ACTIVATE(commBA);
313     OnOfflineDevice onlineForBA;
314     REG_CONNECT_CALLBACK(commBA, onlineForBA);
315     std::this_thread::sleep_for(std::chrono::milliseconds(100));
316     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
317     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
318     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
319     EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
320     EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
321 
322     /**
323      * @tc.steps: step5. device A alloc communicator AB using label B and register callback
324      * @tc.expected: step5. communicator AB has callback of device B online;
325      *                      communicator BB has callback of device A online;
326      */
327     ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
328     ASSERT_NOT_NULL_AND_ACTIVATE(commAB);
329     OnOfflineDevice onlineForAB;
330     REG_CONNECT_CALLBACK(commAB, onlineForAB);
331     std::this_thread::sleep_for(std::chrono::milliseconds(100));
332     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
333     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
334     EXPECT_EQ(onlineForAB.latestOnlineDevice, DEVICE_NAME_B);
335     EXPECT_EQ(onlineForBB.latestOnlineDevice, DEVICE_NAME_A);
336 
337     /**
338      * @tc.steps: step6. device A release communicator AA
339      * @tc.expected: step6. communicator BA has callback of device A offline;
340      *                      communicator AB and BB no callback;
341      */
342     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
343     std::this_thread::sleep_for(std::chrono::milliseconds(100));
344     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
345     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
346     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
347     EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
348 
349     /**
350      * @tc.steps: step7. device B release communicator BA
351      * @tc.expected: step7. communicator AB and BB no callback;
352      */
353     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
354     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
355     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
356 
357     /**
358      * @tc.steps: step8. device B release communicator BB
359      * @tc.expected: step8. communicator AB has callback of device B offline;
360      */
361     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
362     std::this_thread::sleep_for(std::chrono::milliseconds(100));
363     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(0));
364     EXPECT_EQ(onlineForAB.latestOfflineDevice, DEVICE_NAME_B);
365 
366     // Clean up
367     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
368     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
369 }
370 
371 /**
372  * @tc.name: Report Device Connect Change 001
373  * @tc.desc: Test CommunicatorAggregator support report device connect change event
374  * @tc.type: FUNC
375  * @tc.require: AR000DR9KV
376  * @tc.author: xiaozhenjian
377  */
378 HWTEST_F(DistributedDBCommunicatorTest, ReportDeviceConnectChange001, TestSize.Level1)
379 {
380     /**
381      * @tc.steps: step1. device A and device B register connect callback to CommunicatorAggregator
382      */
383     OnOfflineDevice onlineForA;
384     int errCode = g_envDeviceA.commAggrHandle->RegOnConnectCallback(
__anone4839d9d0502(const std::string &target, bool isConnect) 385         [&onlineForA](const std::string &target, bool isConnect) {
386             HandleConnectChange(onlineForA, target, isConnect);
387         }, nullptr);
388     EXPECT_EQ(errCode, E_OK);
389     OnOfflineDevice onlineForB;
390     errCode = g_envDeviceB.commAggrHandle->RegOnConnectCallback(
__anone4839d9d0602(const std::string &target, bool isConnect) 391         [&onlineForB](const std::string &target, bool isConnect) {
392             HandleConnectChange(onlineForB, target, isConnect);
393         }, nullptr);
394     EXPECT_EQ(errCode, E_OK);
395 
396     /**
397      * @tc.steps: step2. connect device A with device B
398      * @tc.expected: step2. device A callback B online; device B callback A online;
399      */
400     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
401     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
402     EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(1));
403     EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(1));
404     EXPECT_EQ(onlineForA.latestOnlineDevice, DEVICE_NAME_B);
405     EXPECT_EQ(onlineForB.latestOnlineDevice, DEVICE_NAME_A);
406 
407     /**
408      * @tc.steps: step3. connect device A with device B
409      * @tc.expected: step3. device A callback B offline; device B callback A offline;
410      */
411     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
412     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
413     EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(0));
414     EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(0));
415     EXPECT_EQ(onlineForA.latestOfflineDevice, DEVICE_NAME_B);
416     EXPECT_EQ(onlineForB.latestOfflineDevice, DEVICE_NAME_A);
417 
418     // Clean up
419     g_envDeviceA.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
420     g_envDeviceB.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
421 }
422 
423 namespace {
ToLabelType(uint64_t commLabel)424 LabelType ToLabelType(uint64_t commLabel)
425 {
426     uint64_t netOrderLabel = HostToNet(commLabel);
427     uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
428     std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
429     for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
430         realLabel[i] = eachByte[i];
431     }
432     return realLabel;
433 }
434 }
435 
436 /**
437  * @tc.name: Report Communicator Not Found 001
438  * @tc.desc: Test CommunicatorAggregator support report communicator not found event
439  * @tc.type: FUNC
440  * @tc.require: AR000DR9KV
441  * @tc.author: xiaozhenjian
442  */
443 HWTEST_F(DistributedDBCommunicatorTest, ReportCommunicatorNotFound001, TestSize.Level1)
444 {
445     /**
446      * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
447      */
448     std::vector<LabelType> lackLabels;
449     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anone4839d9d0802(const LabelType &commLabel, const std::string &userId)450         [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
451             lackLabels.push_back(commLabel);
452             return -E_NOT_FOUND;
453         }, nullptr);
454     EXPECT_EQ(errCode, E_OK);
455 
456     /**
457      * @tc.steps: step2. connect device A with device B
458      */
459     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
460     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
461 
462     /**
463      * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
464      * @tc.expected: step3. device B callback that label A not found.
465      */
466     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
467     ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
468     Message *msgForAA = BuildRegedTinyMessage();
469     ASSERT_NE(msgForAA, nullptr);
470     SendConfig conf = {true, false, 0};
471     errCode = commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
472     EXPECT_EQ(errCode, E_OK);
473     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
474     ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
475     EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
476 
477     /**
478      * @tc.steps: step4. device B alloc communicator BA using label A and register message callback
479      * @tc.expected: step4. communicator BA will not receive message.
480      */
481     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
482     ASSERT_NE(commBA, nullptr);
483     Message *recvMsgForBA = nullptr;
__anone4839d9d0902(const std::string &srcTarget, Message *inMsg) 484     commBA->RegOnMessageCallback([&recvMsgForBA](const std::string &srcTarget, Message *inMsg) {
485         recvMsgForBA = inMsg;
486     }, nullptr);
487     commBA->Activate();
488     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
489     EXPECT_EQ(recvMsgForBA, nullptr);
490 
491     // Clean up
492     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
493     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
494     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
495     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
496 }
497 
498 #define DO_SEND_MESSAGE(src, dst, label, session) \
499 { \
500     Message *msgFor##src##label = BuildRegedTinyMessage(); \
501     ASSERT_NE(msgFor##src##label, nullptr); \
502     msgFor##src##label->SetSessionId(session); \
503     SendConfig conf = {true, false, 0}; \
504     errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
505     EXPECT_EQ(errCode, E_OK); \
506 }
507 
508 #define DO_SEND_GIANT_MESSAGE(src, dst, label, size) \
509 { \
510     Message *msgFor##src##label = BuildRegedGiantMessage(size); \
511     ASSERT_NE(msgFor##src##label, nullptr); \
512     SendConfig conf = {false, false, 0}; \
513     errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
514     EXPECT_EQ(errCode, E_OK); \
515 }
516 
517 #define ALLOC_AND_SEND_MESSAGE(src, dst, label, session) \
518     ICommunicator *comm##src##label = g_envDevice##src.commAggrHandle->AllocCommunicator(LABEL_##label, errCode); \
519     ASSERT_NOT_NULL_AND_ACTIVATE(comm##src##label); \
520     DO_SEND_MESSAGE(src, dst, label, session)
521 
522 #define REG_MESSAGE_CALLBACK(src, label) \
523     string srcTargetFor##src##label; \
524     Message *recvMsgFor##src##label = nullptr; \
525     comm##src##label->RegOnMessageCallback( \
526         [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, Message *inMsg) { \
527         srcTargetFor##src##label = srcTarget; \
528         recvMsgFor##src##label = inMsg; \
529     }, nullptr);
530 
531 /**
532  * @tc.name: ReDeliver Message 001
533  * @tc.desc: Test CommunicatorAggregator support redeliver message
534  * @tc.type: FUNC
535  * @tc.require: AR000DR9KV
536  * @tc.author: xiaozhenjian
537  */
538 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage001, TestSize.Level1)
539 {
540     /**
541      * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
542      */
543     std::vector<LabelType> lackLabels;
544     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anone4839d9d0a02(const LabelType &commLabel, const std::string &userId)545         [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
546             lackLabels.push_back(commLabel);
547             return E_OK;
548         }, nullptr);
549     EXPECT_EQ(errCode, E_OK);
550 
551     /**
552      * @tc.steps: step2. connect device A with device B
553      */
554     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
555     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
556 
557     /**
558      * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
559      * @tc.expected: step3. device B callback that label A not found.
560      */
561     ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
562     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
563     ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
564     EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
565 
566     /**
567      * @tc.steps: step4. device A alloc communicator AB using label B and send message to B
568      * @tc.expected: step4. device B callback that label B not found.
569      */
570     ALLOC_AND_SEND_MESSAGE(A, B, B, 200); // session id 200
571     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
572     ASSERT_EQ(lackLabels.size(), static_cast<size_t>(2));
573     EXPECT_EQ(lackLabels[1], ToLabelType(LABEL_B)); // 1 for second element
574 
575     /**
576      * @tc.steps: step5. device B alloc communicator BA using label A and register message callback
577      * @tc.expected: step5. communicator BA will receive message.
578      */
579     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
580     ASSERT_NE(commBA, nullptr);
581     REG_MESSAGE_CALLBACK(B, A);
582     commBA->Activate();
583     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
584     EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
585     ASSERT_NE(recvMsgForBA, nullptr);
586     EXPECT_EQ(recvMsgForBA->GetSessionId(), 100U); // session id 100
587     delete recvMsgForBA;
588     recvMsgForBA = nullptr;
589 
590     /**
591      * @tc.steps: step6. device B alloc communicator BB using label B and register message callback
592      * @tc.expected: step6. communicator BB will receive message.
593      */
594     ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
595     ASSERT_NE(commBB, nullptr);
596     REG_MESSAGE_CALLBACK(B, B);
597     commBB->Activate();
598     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
599     EXPECT_EQ(srcTargetForBB, DEVICE_NAME_A);
600     ASSERT_NE(recvMsgForBB, nullptr);
601     EXPECT_EQ(recvMsgForBB->GetSessionId(), 200U); // session id 200
602     delete recvMsgForBB;
603     recvMsgForBB = nullptr;
604 
605     // Clean up
606     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
607     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
608     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
609     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
610     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
611     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
612 }
613 
614 /**
615  * @tc.name: ReDeliver Message 002
616  * @tc.desc: Test CommunicatorAggregator support redeliver message by order
617  * @tc.type: FUNC
618  * @tc.require: AR000DR9KV
619  * @tc.author: xiaozhenjian
620  */
621 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage002, TestSize.Level1)
622 {
623     /**
624      * @tc.steps: step1. device C create CommunicatorAggregator and initialize
625      */
626     bool step1 = SetUpEnv(g_envDeviceC, DEVICE_NAME_C);
627     ASSERT_EQ(step1, true);
628 
629     /**
630      * @tc.steps: step2. device B register communicator not found callback to CommunicatorAggregator
631      */
632     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anone4839d9d0b02(const LabelType &commLabel, const std::string &userId)633         const std::string &userId)->int {
634         return E_OK;
635     }, nullptr);
636     EXPECT_EQ(errCode, E_OK);
637 
638     /**
639      * @tc.steps: step3. connect device A with device B, then device B with device C
640      */
641     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
642     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
643     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
644 
645     /**
646      * @tc.steps: step4. device A alloc communicator AA using label A and send message to B
647      */
648     ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
649     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
650 
651     /**
652      * @tc.steps: step5. device C alloc communicator CA using label A and send message to B
653      */
654     ALLOC_AND_SEND_MESSAGE(C, B, A, 200); // session id 200
655     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
656     DO_SEND_MESSAGE(A, B, A, 300); // session id 300
657     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
658     DO_SEND_MESSAGE(C, B, A, 400); // session id 400
659     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
660 
661     /**
662      * @tc.steps: step6. device B alloc communicator BA using label A and register message callback
663      * @tc.expected: step6. communicator BA will receive message in order of sessionid 100, 200, 300, 400.
664      */
665     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
666     ASSERT_NE(commBA, nullptr);
667     std::vector<std::pair<std::string, Message *>> msgCallbackForBA;
__anone4839d9d0c02(const std::string &srcTarget, Message *inMsg) 668     commBA->RegOnMessageCallback([&msgCallbackForBA](const std::string &srcTarget, Message *inMsg) {
669         msgCallbackForBA.push_back({srcTarget, inMsg});
670     }, nullptr);
671     commBA->Activate();
672     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
673     ASSERT_EQ(msgCallbackForBA.size(), static_cast<size_t>(4)); // total 4 callback
674     EXPECT_EQ(msgCallbackForBA[0].first, DEVICE_NAME_A); // the 0 order element
675     EXPECT_EQ(msgCallbackForBA[1].first, DEVICE_NAME_C); // the 1 order element
676     EXPECT_EQ(msgCallbackForBA[2].first, DEVICE_NAME_A); // the 2 order element
677     EXPECT_EQ(msgCallbackForBA[3].first, DEVICE_NAME_C); // the 3 order element
678     for (uint32_t i = 0; i < msgCallbackForBA.size(); i++) {
679         EXPECT_EQ(msgCallbackForBA[i].second->GetSessionId(), static_cast<uint32_t>((i + 1) * 100)); // 1 sessionid 100
680         delete msgCallbackForBA[i].second;
681         msgCallbackForBA[i].second = nullptr;
682     }
683 
684     // Clean up
685     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
686     g_envDeviceC.commAggrHandle->ReleaseCommunicator(commCA);
687     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
688     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
689     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
690     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
691     TearDownEnv(g_envDeviceC);
692 }
693 
694 /**
695  * @tc.name: ReDeliver Message 003
696  * @tc.desc: For observe memory in unusual scenario
697  * @tc.type: FUNC
698  * @tc.require: AR000DR9KV
699  * @tc.author: xiaozhenjian
700  */
701 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage003, TestSize.Level2)
702 {
703     /**
704      * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
705      */
706     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anone4839d9d0d02(const LabelType &commLabel, const std::string &userId)707         const std::string &userId)->int {
708         return E_OK;
709     }, nullptr);
710     EXPECT_EQ(errCode, E_OK);
711 
712     /**
713      * @tc.steps: step2. connect device A with device B
714      */
715     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
716     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
717 
718     /**
719      * @tc.steps: step3. device A alloc communicator AA,AB,AC using label A,B,C
720      */
721     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
722     ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
723     ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
724     ASSERT_NOT_NULL_AND_ACTIVATE(commAB);
725     ICommunicator *commAC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_C, errCode);
726     ASSERT_NOT_NULL_AND_ACTIVATE(commAC);
727     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
728 
729     /**
730      * @tc.steps: step4. device A Continuously send tiny message to B using communicator AA,AB,AC
731      */
732     for (int turn = 0; turn < 11; turn++) { // Total 11 turns
733         DO_SEND_MESSAGE(A, B, A, 0);
734         DO_SEND_MESSAGE(A, B, B, 0);
735         DO_SEND_MESSAGE(A, B, C, 0);
736     }
737 
738     /**
739      * @tc.steps: step5. device A Continuously send giant message to B using communicator AA,AB,AC
740      */
741     for (int turn = 0; turn < 5; turn++) { // Total 5 turns
742         DO_SEND_GIANT_MESSAGE(A, B, A, (3 * 1024 * 1024)); // 3 MBytes, 1024 is scale
743         DO_SEND_GIANT_MESSAGE(A, B, B, (6 * 1024 * 1024)); // 6 MBytes, 1024 is scale
744         DO_SEND_GIANT_MESSAGE(A, B, C, (7 * 1024 * 1024)); // 7 MBytes, 1024 is scale
745     }
746     DO_SEND_GIANT_MESSAGE(A, B, A, (30 * 1024 * 1024)); // 30 MBytes, 1024 is scale
747 
748     /**
749      * @tc.steps: step6. wait a long time then send last frame
750      */
751     for (int sec = 0; sec < 15; sec++) { // Total 15 s
752         std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
753         LOGI("[UT][Test][ReDeliverMessage003] Sleep and wait=%d.", sec);
754     }
755     DO_SEND_MESSAGE(A, B, A, 0);
756     std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
757 
758     // Clean up
759     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
760     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
761     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAC);
762     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
763     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
764 }
765