1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <thread>
18 #include "db_errno.h"
19 #include "distributeddb_communicator_common.h"
20 #include "distributeddb_tools_unit_test.h"
21 #include "endian_convert.h"
22 #include "log_print.h"
23
24 using namespace std;
25 using namespace testing::ext;
26 using namespace DistributedDB;
27
28 namespace {
29 EnvHandle g_envDeviceA;
30 EnvHandle g_envDeviceB;
31 EnvHandle g_envDeviceC;
32 }
33
HandleConnectChange(OnOfflineDevice & onlines,const std::string & target,bool isConnect)34 static void HandleConnectChange(OnOfflineDevice &onlines, const std::string &target, bool isConnect)
35 {
36 if (isConnect) {
37 onlines.onlineDevices.insert(target);
38 onlines.latestOnlineDevice = target;
39 onlines.latestOfflineDevice.clear();
40 } else {
41 onlines.onlineDevices.erase(target);
42 onlines.latestOnlineDevice.clear();
43 onlines.latestOfflineDevice = target;
44 }
45 }
46
47 class DistributedDBCommunicatorTest : public testing::Test {
48 public:
49 static void SetUpTestCase(void);
50 static void TearDownTestCase(void);
51 void SetUp();
52 void TearDown();
53 };
54
SetUpTestCase(void)55 void DistributedDBCommunicatorTest::SetUpTestCase(void)
56 {
57 /**
58 * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
59 */
60 LOGI("[UT][Test][SetUpTestCase] Enter.");
61 bool errCode = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
62 ASSERT_EQ(errCode, true);
63 errCode = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
64 ASSERT_EQ(errCode, true);
65 DoRegTransformFunction();
66 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
67 }
68
TearDownTestCase(void)69 void DistributedDBCommunicatorTest::TearDownTestCase(void)
70 {
71 /**
72 * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
73 */
74 LOGI("[UT][Test][TearDownTestCase] Enter.");
75 std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
76 TearDownEnv(g_envDeviceA);
77 TearDownEnv(g_envDeviceB);
78 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
79 }
80
SetUp()81 void DistributedDBCommunicatorTest::SetUp()
82 {
83 DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
84 }
85
TearDown()86 void DistributedDBCommunicatorTest::TearDown()
87 {
88 /**
89 * @tc.teardown: Wait 100 ms to make sure all thread quiet
90 */
91 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
92 }
93
94 /**
95 * @tc.name: Communicator Management 001
96 * @tc.desc: Test alloc and release communicator
97 * @tc.type: FUNC
98 * @tc.require: AR000BVDGG AR000CQE0L
99 * @tc.author: xiaozhenjian
100 */
101 HWTEST_F(DistributedDBCommunicatorTest, CommunicatorManagement001, TestSize.Level1)
102 {
103 /**
104 * @tc.steps: step1. alloc communicator A using label A
105 * @tc.expected: step1. alloc return OK.
106 */
107 int errorNo = E_OK;
108 ICommunicator *commA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
109 EXPECT_EQ(errorNo, E_OK);
110 EXPECT_NE(commA, nullptr);
111
112 /**
113 * @tc.steps: step2. alloc communicator B using label B
114 * @tc.expected: step2. alloc return OK.
115 */
116 errorNo = E_OK;
117 ICommunicator *commB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
118 EXPECT_EQ(errorNo, E_OK);
119 EXPECT_NE(commA, nullptr);
120
121 /**
122 * @tc.steps: step3. alloc communicator C using label A
123 * @tc.expected: step3. alloc return not OK.
124 */
125 errorNo = E_OK;
126 ICommunicator *commC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
127 EXPECT_NE(errorNo, E_OK);
128 EXPECT_EQ(commC, nullptr);
129
130 /**
131 * @tc.steps: step4. release communicator A and communicator B
132 */
133 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commA);
134 commA = nullptr;
135 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commB);
136 commB = nullptr;
137
138 /**
139 * @tc.steps: step5. alloc communicator D using label A
140 * @tc.expected: step5. alloc return OK.
141 */
142 errorNo = E_OK;
143 ICommunicator *commD = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
144 EXPECT_EQ(errorNo, E_OK);
145 EXPECT_NE(commD, nullptr);
146
147 /**
148 * @tc.steps: step6. release communicator D
149 */
150 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commD);
151 commD = nullptr;
152 }
153
ConnectWaitDisconnect()154 static void ConnectWaitDisconnect()
155 {
156 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
157 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
158 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
159 }
160
161 /**
162 * @tc.name: Online And Offline 001
163 * @tc.desc: Test functionality triggered by physical devices online and offline
164 * @tc.type: FUNC
165 * @tc.require: AR000BVRNS AR000CQE0H
166 * @tc.author: wudongxing
167 */
168 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline001, TestSize.Level1)
169 {
170 /**
171 * @tc.steps: step1. device A alloc communicator AA using label A and register callback
172 * @tc.expected: step1. no callback.
173 */
174 int errorNo = E_OK;
175 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
176 ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
177 OnOfflineDevice onlineForAA;
__anone4839d9d0202(const std::string &target, bool isConnect) 178 commAA->RegOnConnectCallback([&onlineForAA](const std::string &target, bool isConnect) {
179 HandleConnectChange(onlineForAA, target, isConnect);}, nullptr);
180 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
181
182 /**
183 * @tc.steps: step2. connect device A with device B and then disconnect
184 * @tc.expected: step2. no callback.
185 */
186 ConnectWaitDisconnect();
187 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
188
189 /**
190 * @tc.steps: step3. device B alloc communicator BB using label B and register callback
191 * @tc.expected: step3. no callback.
192 */
193 ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
194 ASSERT_NOT_NULL_AND_ACTIVATE(commBB);
195 OnOfflineDevice onlineForBB;
__anone4839d9d0302(const std::string &target, bool isConnect) 196 commBB->RegOnConnectCallback([&onlineForBB](const std::string &target, bool isConnect) {
197 HandleConnectChange(onlineForBB, target, isConnect);}, nullptr);
198 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
199 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
200
201 /**
202 * @tc.steps: step4. connect device A with device B and then disconnect
203 * @tc.expected: step4. no callback.
204 */
205 ConnectWaitDisconnect();
206 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
207 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
208
209 /**
210 * @tc.steps: step5. device B alloc communicator BA using label A and register callback
211 * @tc.expected: step5. no callback.
212 */
213 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
214 ASSERT_NOT_NULL_AND_ACTIVATE(commBA);
215 OnOfflineDevice onlineForBA;
__anone4839d9d0402(const std::string &target, bool isConnect) 216 commBA->RegOnConnectCallback([&onlineForBA](const std::string &target, bool isConnect) {
217 HandleConnectChange(onlineForBA, target, isConnect);}, nullptr);
218 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
219 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
220 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
221
222 /**
223 * @tc.steps: step6. connect device A with device B
224 * @tc.expected: step6. communicator AA has callback of device B online;
225 * communicator BA has callback of device A online;
226 * communicator BB no callback
227 */
228 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
229 std::this_thread::sleep_for(std::chrono::milliseconds(100));
230 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
231 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
232 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
233 EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
234 EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
235
236 /**
237 * @tc.steps: step7. disconnect device A with device B
238 * @tc.expected: step7. communicator AA has callback of device B offline;
239 * communicator BA has callback of device A offline;
240 * communicator BB no callback
241 */
242 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
243 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
244 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
245 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
246 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
247 EXPECT_EQ(onlineForAA.latestOfflineDevice, DEVICE_NAME_B);
248 EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
249
250 // Clean up
251 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
252 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
253 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
254 }
255
256 #define REG_CONNECT_CALLBACK(communicator, online) \
257 { \
258 communicator->RegOnConnectCallback([&online](const std::string &target, bool isConnect) { \
259 HandleConnectChange(online, target, isConnect); \
260 }, nullptr); \
261 }
262
263 #define CONNECT_AND_WAIT(waitTime) \
264 { \
265 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle); \
266 std::this_thread::sleep_for(std::chrono::milliseconds(waitTime)); \
267 }
268
269 /**
270 * @tc.name: Online And Offline 002
271 * @tc.desc: Test functionality triggered by alloc and release communicator
272 * @tc.type: FUNC
273 * @tc.require: AR000BVRNT AR000CQE0I
274 * @tc.author: wudongxing
275 */
276 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline002, TestSize.Level1)
277 {
278 /**
279 * @tc.steps: step1. connect device A with device B
280 */
281 CONNECT_AND_WAIT(200); // Sleep 200 ms
282
283 /**
284 * @tc.steps: step2. device A alloc communicator AA using label A and register callback
285 * @tc.expected: step2. no callback.
286 */
287 int errorNo = E_OK;
288 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
289 ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
290 OnOfflineDevice onlineForAA;
291 REG_CONNECT_CALLBACK(commAA, onlineForAA);
292 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
293
294 /**
295 * @tc.steps: step3. device B alloc communicator BB using label B and register callback
296 * @tc.expected: step3. no callback.
297 */
298 ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
299 ASSERT_NOT_NULL_AND_ACTIVATE(commBB);
300 OnOfflineDevice onlineForBB;
301 REG_CONNECT_CALLBACK(commBB, onlineForBB);
302 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
303 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
304
305 /**
306 * @tc.steps: step4. device B alloc communicator BA using label A and register callback
307 * @tc.expected: step4. communicator AA has callback of device B online;
308 * communicator BA has callback of device A online;
309 * communicator BB no callback.
310 */
311 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
312 ASSERT_NOT_NULL_AND_ACTIVATE(commBA);
313 OnOfflineDevice onlineForBA;
314 REG_CONNECT_CALLBACK(commBA, onlineForBA);
315 std::this_thread::sleep_for(std::chrono::milliseconds(100));
316 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
317 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
318 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
319 EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
320 EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
321
322 /**
323 * @tc.steps: step5. device A alloc communicator AB using label B and register callback
324 * @tc.expected: step5. communicator AB has callback of device B online;
325 * communicator BB has callback of device A online;
326 */
327 ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
328 ASSERT_NOT_NULL_AND_ACTIVATE(commAB);
329 OnOfflineDevice onlineForAB;
330 REG_CONNECT_CALLBACK(commAB, onlineForAB);
331 std::this_thread::sleep_for(std::chrono::milliseconds(100));
332 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
333 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
334 EXPECT_EQ(onlineForAB.latestOnlineDevice, DEVICE_NAME_B);
335 EXPECT_EQ(onlineForBB.latestOnlineDevice, DEVICE_NAME_A);
336
337 /**
338 * @tc.steps: step6. device A release communicator AA
339 * @tc.expected: step6. communicator BA has callback of device A offline;
340 * communicator AB and BB no callback;
341 */
342 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
343 std::this_thread::sleep_for(std::chrono::milliseconds(100));
344 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
345 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
346 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
347 EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
348
349 /**
350 * @tc.steps: step7. device B release communicator BA
351 * @tc.expected: step7. communicator AB and BB no callback;
352 */
353 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
354 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
355 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
356
357 /**
358 * @tc.steps: step8. device B release communicator BB
359 * @tc.expected: step8. communicator AB has callback of device B offline;
360 */
361 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
362 std::this_thread::sleep_for(std::chrono::milliseconds(100));
363 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(0));
364 EXPECT_EQ(onlineForAB.latestOfflineDevice, DEVICE_NAME_B);
365
366 // Clean up
367 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
368 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
369 }
370
371 /**
372 * @tc.name: Report Device Connect Change 001
373 * @tc.desc: Test CommunicatorAggregator support report device connect change event
374 * @tc.type: FUNC
375 * @tc.require: AR000DR9KV
376 * @tc.author: xiaozhenjian
377 */
378 HWTEST_F(DistributedDBCommunicatorTest, ReportDeviceConnectChange001, TestSize.Level1)
379 {
380 /**
381 * @tc.steps: step1. device A and device B register connect callback to CommunicatorAggregator
382 */
383 OnOfflineDevice onlineForA;
384 int errCode = g_envDeviceA.commAggrHandle->RegOnConnectCallback(
__anone4839d9d0502(const std::string &target, bool isConnect) 385 [&onlineForA](const std::string &target, bool isConnect) {
386 HandleConnectChange(onlineForA, target, isConnect);
387 }, nullptr);
388 EXPECT_EQ(errCode, E_OK);
389 OnOfflineDevice onlineForB;
390 errCode = g_envDeviceB.commAggrHandle->RegOnConnectCallback(
__anone4839d9d0602(const std::string &target, bool isConnect) 391 [&onlineForB](const std::string &target, bool isConnect) {
392 HandleConnectChange(onlineForB, target, isConnect);
393 }, nullptr);
394 EXPECT_EQ(errCode, E_OK);
395
396 /**
397 * @tc.steps: step2. connect device A with device B
398 * @tc.expected: step2. device A callback B online; device B callback A online;
399 */
400 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
401 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
402 EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(1));
403 EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(1));
404 EXPECT_EQ(onlineForA.latestOnlineDevice, DEVICE_NAME_B);
405 EXPECT_EQ(onlineForB.latestOnlineDevice, DEVICE_NAME_A);
406
407 /**
408 * @tc.steps: step3. connect device A with device B
409 * @tc.expected: step3. device A callback B offline; device B callback A offline;
410 */
411 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
412 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
413 EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(0));
414 EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(0));
415 EXPECT_EQ(onlineForA.latestOfflineDevice, DEVICE_NAME_B);
416 EXPECT_EQ(onlineForB.latestOfflineDevice, DEVICE_NAME_A);
417
418 // Clean up
419 g_envDeviceA.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
420 g_envDeviceB.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
421 }
422
423 namespace {
ToLabelType(uint64_t commLabel)424 LabelType ToLabelType(uint64_t commLabel)
425 {
426 uint64_t netOrderLabel = HostToNet(commLabel);
427 uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
428 std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
429 for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
430 realLabel[i] = eachByte[i];
431 }
432 return realLabel;
433 }
434 }
435
436 /**
437 * @tc.name: Report Communicator Not Found 001
438 * @tc.desc: Test CommunicatorAggregator support report communicator not found event
439 * @tc.type: FUNC
440 * @tc.require: AR000DR9KV
441 * @tc.author: xiaozhenjian
442 */
443 HWTEST_F(DistributedDBCommunicatorTest, ReportCommunicatorNotFound001, TestSize.Level1)
444 {
445 /**
446 * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
447 */
448 std::vector<LabelType> lackLabels;
449 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anone4839d9d0802(const LabelType &commLabel, const std::string &userId)450 [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
451 lackLabels.push_back(commLabel);
452 return -E_NOT_FOUND;
453 }, nullptr);
454 EXPECT_EQ(errCode, E_OK);
455
456 /**
457 * @tc.steps: step2. connect device A with device B
458 */
459 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
460 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
461
462 /**
463 * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
464 * @tc.expected: step3. device B callback that label A not found.
465 */
466 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
467 ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
468 Message *msgForAA = BuildRegedTinyMessage();
469 ASSERT_NE(msgForAA, nullptr);
470 SendConfig conf = {true, false, 0};
471 errCode = commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
472 EXPECT_EQ(errCode, E_OK);
473 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
474 ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
475 EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
476
477 /**
478 * @tc.steps: step4. device B alloc communicator BA using label A and register message callback
479 * @tc.expected: step4. communicator BA will not receive message.
480 */
481 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
482 ASSERT_NE(commBA, nullptr);
483 Message *recvMsgForBA = nullptr;
__anone4839d9d0902(const std::string &srcTarget, Message *inMsg) 484 commBA->RegOnMessageCallback([&recvMsgForBA](const std::string &srcTarget, Message *inMsg) {
485 recvMsgForBA = inMsg;
486 }, nullptr);
487 commBA->Activate();
488 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
489 EXPECT_EQ(recvMsgForBA, nullptr);
490
491 // Clean up
492 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
493 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
494 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
495 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
496 }
497
498 #define DO_SEND_MESSAGE(src, dst, label, session) \
499 { \
500 Message *msgFor##src##label = BuildRegedTinyMessage(); \
501 ASSERT_NE(msgFor##src##label, nullptr); \
502 msgFor##src##label->SetSessionId(session); \
503 SendConfig conf = {true, false, 0}; \
504 errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
505 EXPECT_EQ(errCode, E_OK); \
506 }
507
508 #define DO_SEND_GIANT_MESSAGE(src, dst, label, size) \
509 { \
510 Message *msgFor##src##label = BuildRegedGiantMessage(size); \
511 ASSERT_NE(msgFor##src##label, nullptr); \
512 SendConfig conf = {false, false, 0}; \
513 errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
514 EXPECT_EQ(errCode, E_OK); \
515 }
516
517 #define ALLOC_AND_SEND_MESSAGE(src, dst, label, session) \
518 ICommunicator *comm##src##label = g_envDevice##src.commAggrHandle->AllocCommunicator(LABEL_##label, errCode); \
519 ASSERT_NOT_NULL_AND_ACTIVATE(comm##src##label); \
520 DO_SEND_MESSAGE(src, dst, label, session)
521
522 #define REG_MESSAGE_CALLBACK(src, label) \
523 string srcTargetFor##src##label; \
524 Message *recvMsgFor##src##label = nullptr; \
525 comm##src##label->RegOnMessageCallback( \
526 [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, Message *inMsg) { \
527 srcTargetFor##src##label = srcTarget; \
528 recvMsgFor##src##label = inMsg; \
529 }, nullptr);
530
531 /**
532 * @tc.name: ReDeliver Message 001
533 * @tc.desc: Test CommunicatorAggregator support redeliver message
534 * @tc.type: FUNC
535 * @tc.require: AR000DR9KV
536 * @tc.author: xiaozhenjian
537 */
538 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage001, TestSize.Level1)
539 {
540 /**
541 * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
542 */
543 std::vector<LabelType> lackLabels;
544 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anone4839d9d0a02(const LabelType &commLabel, const std::string &userId)545 [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
546 lackLabels.push_back(commLabel);
547 return E_OK;
548 }, nullptr);
549 EXPECT_EQ(errCode, E_OK);
550
551 /**
552 * @tc.steps: step2. connect device A with device B
553 */
554 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
555 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
556
557 /**
558 * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
559 * @tc.expected: step3. device B callback that label A not found.
560 */
561 ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
562 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
563 ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
564 EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
565
566 /**
567 * @tc.steps: step4. device A alloc communicator AB using label B and send message to B
568 * @tc.expected: step4. device B callback that label B not found.
569 */
570 ALLOC_AND_SEND_MESSAGE(A, B, B, 200); // session id 200
571 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
572 ASSERT_EQ(lackLabels.size(), static_cast<size_t>(2));
573 EXPECT_EQ(lackLabels[1], ToLabelType(LABEL_B)); // 1 for second element
574
575 /**
576 * @tc.steps: step5. device B alloc communicator BA using label A and register message callback
577 * @tc.expected: step5. communicator BA will receive message.
578 */
579 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
580 ASSERT_NE(commBA, nullptr);
581 REG_MESSAGE_CALLBACK(B, A);
582 commBA->Activate();
583 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
584 EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
585 ASSERT_NE(recvMsgForBA, nullptr);
586 EXPECT_EQ(recvMsgForBA->GetSessionId(), 100U); // session id 100
587 delete recvMsgForBA;
588 recvMsgForBA = nullptr;
589
590 /**
591 * @tc.steps: step6. device B alloc communicator BB using label B and register message callback
592 * @tc.expected: step6. communicator BB will receive message.
593 */
594 ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
595 ASSERT_NE(commBB, nullptr);
596 REG_MESSAGE_CALLBACK(B, B);
597 commBB->Activate();
598 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
599 EXPECT_EQ(srcTargetForBB, DEVICE_NAME_A);
600 ASSERT_NE(recvMsgForBB, nullptr);
601 EXPECT_EQ(recvMsgForBB->GetSessionId(), 200U); // session id 200
602 delete recvMsgForBB;
603 recvMsgForBB = nullptr;
604
605 // Clean up
606 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
607 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
608 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
609 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
610 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
611 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
612 }
613
614 /**
615 * @tc.name: ReDeliver Message 002
616 * @tc.desc: Test CommunicatorAggregator support redeliver message by order
617 * @tc.type: FUNC
618 * @tc.require: AR000DR9KV
619 * @tc.author: xiaozhenjian
620 */
621 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage002, TestSize.Level1)
622 {
623 /**
624 * @tc.steps: step1. device C create CommunicatorAggregator and initialize
625 */
626 bool step1 = SetUpEnv(g_envDeviceC, DEVICE_NAME_C);
627 ASSERT_EQ(step1, true);
628
629 /**
630 * @tc.steps: step2. device B register communicator not found callback to CommunicatorAggregator
631 */
632 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anone4839d9d0b02(const LabelType &commLabel, const std::string &userId)633 const std::string &userId)->int {
634 return E_OK;
635 }, nullptr);
636 EXPECT_EQ(errCode, E_OK);
637
638 /**
639 * @tc.steps: step3. connect device A with device B, then device B with device C
640 */
641 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
642 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
643 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
644
645 /**
646 * @tc.steps: step4. device A alloc communicator AA using label A and send message to B
647 */
648 ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
649 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
650
651 /**
652 * @tc.steps: step5. device C alloc communicator CA using label A and send message to B
653 */
654 ALLOC_AND_SEND_MESSAGE(C, B, A, 200); // session id 200
655 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
656 DO_SEND_MESSAGE(A, B, A, 300); // session id 300
657 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
658 DO_SEND_MESSAGE(C, B, A, 400); // session id 400
659 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
660
661 /**
662 * @tc.steps: step6. device B alloc communicator BA using label A and register message callback
663 * @tc.expected: step6. communicator BA will receive message in order of sessionid 100, 200, 300, 400.
664 */
665 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
666 ASSERT_NE(commBA, nullptr);
667 std::vector<std::pair<std::string, Message *>> msgCallbackForBA;
__anone4839d9d0c02(const std::string &srcTarget, Message *inMsg) 668 commBA->RegOnMessageCallback([&msgCallbackForBA](const std::string &srcTarget, Message *inMsg) {
669 msgCallbackForBA.push_back({srcTarget, inMsg});
670 }, nullptr);
671 commBA->Activate();
672 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
673 ASSERT_EQ(msgCallbackForBA.size(), static_cast<size_t>(4)); // total 4 callback
674 EXPECT_EQ(msgCallbackForBA[0].first, DEVICE_NAME_A); // the 0 order element
675 EXPECT_EQ(msgCallbackForBA[1].first, DEVICE_NAME_C); // the 1 order element
676 EXPECT_EQ(msgCallbackForBA[2].first, DEVICE_NAME_A); // the 2 order element
677 EXPECT_EQ(msgCallbackForBA[3].first, DEVICE_NAME_C); // the 3 order element
678 for (uint32_t i = 0; i < msgCallbackForBA.size(); i++) {
679 EXPECT_EQ(msgCallbackForBA[i].second->GetSessionId(), static_cast<uint32_t>((i + 1) * 100)); // 1 sessionid 100
680 delete msgCallbackForBA[i].second;
681 msgCallbackForBA[i].second = nullptr;
682 }
683
684 // Clean up
685 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
686 g_envDeviceC.commAggrHandle->ReleaseCommunicator(commCA);
687 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
688 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
689 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
690 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
691 TearDownEnv(g_envDeviceC);
692 }
693
694 /**
695 * @tc.name: ReDeliver Message 003
696 * @tc.desc: For observe memory in unusual scenario
697 * @tc.type: FUNC
698 * @tc.require: AR000DR9KV
699 * @tc.author: xiaozhenjian
700 */
701 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage003, TestSize.Level2)
702 {
703 /**
704 * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
705 */
706 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anone4839d9d0d02(const LabelType &commLabel, const std::string &userId)707 const std::string &userId)->int {
708 return E_OK;
709 }, nullptr);
710 EXPECT_EQ(errCode, E_OK);
711
712 /**
713 * @tc.steps: step2. connect device A with device B
714 */
715 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
716 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
717
718 /**
719 * @tc.steps: step3. device A alloc communicator AA,AB,AC using label A,B,C
720 */
721 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
722 ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
723 ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
724 ASSERT_NOT_NULL_AND_ACTIVATE(commAB);
725 ICommunicator *commAC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_C, errCode);
726 ASSERT_NOT_NULL_AND_ACTIVATE(commAC);
727 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
728
729 /**
730 * @tc.steps: step4. device A Continuously send tiny message to B using communicator AA,AB,AC
731 */
732 for (int turn = 0; turn < 11; turn++) { // Total 11 turns
733 DO_SEND_MESSAGE(A, B, A, 0);
734 DO_SEND_MESSAGE(A, B, B, 0);
735 DO_SEND_MESSAGE(A, B, C, 0);
736 }
737
738 /**
739 * @tc.steps: step5. device A Continuously send giant message to B using communicator AA,AB,AC
740 */
741 for (int turn = 0; turn < 5; turn++) { // Total 5 turns
742 DO_SEND_GIANT_MESSAGE(A, B, A, (3 * 1024 * 1024)); // 3 MBytes, 1024 is scale
743 DO_SEND_GIANT_MESSAGE(A, B, B, (6 * 1024 * 1024)); // 6 MBytes, 1024 is scale
744 DO_SEND_GIANT_MESSAGE(A, B, C, (7 * 1024 * 1024)); // 7 MBytes, 1024 is scale
745 }
746 DO_SEND_GIANT_MESSAGE(A, B, A, (30 * 1024 * 1024)); // 30 MBytes, 1024 is scale
747
748 /**
749 * @tc.steps: step6. wait a long time then send last frame
750 */
751 for (int sec = 0; sec < 15; sec++) { // Total 15 s
752 std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
753 LOGI("[UT][Test][ReDeliverMessage003] Sleep and wait=%d.", sec);
754 }
755 DO_SEND_MESSAGE(A, B, A, 0);
756 std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
757
758 // Clean up
759 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
760 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
761 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAC);
762 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
763 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
764 }
765