1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <thread>
18 #include "db_errno.h"
19 #include "distributeddb_communicator_common.h"
20 #include "distributeddb_tools_unit_test.h"
21 #include "endian_convert.h"
22 #include "log_print.h"
23 #include "thread_pool_test_stub.h"
24
25 using namespace std;
26 using namespace testing::ext;
27 using namespace DistributedDB;
28
29 namespace {
30 EnvHandle g_envDeviceA;
31 EnvHandle g_envDeviceB;
32 EnvHandle g_envDeviceC;
33
HandleConnectChange(OnOfflineDevice & onlines,const std::string & target,bool isConnect)34 static void HandleConnectChange(OnOfflineDevice &onlines, const std::string &target, bool isConnect)
35 {
36 if (isConnect) {
37 onlines.onlineDevices.insert(target);
38 onlines.latestOnlineDevice = target;
39 onlines.latestOfflineDevice.clear();
40 } else {
41 onlines.onlineDevices.erase(target);
42 onlines.latestOnlineDevice.clear();
43 onlines.latestOfflineDevice = target;
44 }
45 }
46
47 class DistributedDBCommunicatorTest : public testing::Test {
48 public:
49 static void SetUpTestCase(void);
50 static void TearDownTestCase(void);
51 void SetUp();
52 void TearDown();
53 };
54
SetUpTestCase(void)55 void DistributedDBCommunicatorTest::SetUpTestCase(void)
56 {
57 /**
58 * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
59 */
60 LOGI("[UT][Test][SetUpTestCase] Enter.");
61 bool errCode = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
62 ASSERT_EQ(errCode, true);
63 errCode = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
64 ASSERT_EQ(errCode, true);
65 DoRegTransformFunction();
66 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
67 }
68
TearDownTestCase(void)69 void DistributedDBCommunicatorTest::TearDownTestCase(void)
70 {
71 /**
72 * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
73 */
74 LOGI("[UT][Test][TearDownTestCase] Enter.");
75 std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
76 TearDownEnv(g_envDeviceA);
77 TearDownEnv(g_envDeviceB);
78 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
79 }
80
SetUp()81 void DistributedDBCommunicatorTest::SetUp()
82 {
83 DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
84 }
85
TearDown()86 void DistributedDBCommunicatorTest::TearDown()
87 {
88 /**
89 * @tc.teardown: Wait 100 ms to make sure all thread quiet
90 */
91 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
92 }
93
94 /**
95 * @tc.name: Communicator Management 001
96 * @tc.desc: Test alloc and release communicator
97 * @tc.type: FUNC
98 * @tc.require: AR000BVDGG AR000CQE0L
99 * @tc.author: xiaozhenjian
100 */
101 HWTEST_F(DistributedDBCommunicatorTest, CommunicatorManagement001, TestSize.Level1)
102 {
103 /**
104 * @tc.steps: step1. alloc communicator A using label A
105 * @tc.expected: step1. alloc return OK.
106 */
107 int errorNo = E_OK;
108 ICommunicator *commA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
109 EXPECT_EQ(errorNo, E_OK);
110 EXPECT_NE(commA, nullptr);
111
112 /**
113 * @tc.steps: step2. alloc communicator B using label B
114 * @tc.expected: step2. alloc return OK.
115 */
116 errorNo = E_OK;
117 ICommunicator *commB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
118 EXPECT_EQ(errorNo, E_OK);
119 EXPECT_NE(commA, nullptr);
120
121 /**
122 * @tc.steps: step3. alloc communicator C using label A
123 * @tc.expected: step3. alloc return not OK.
124 */
125 errorNo = E_OK;
126 ICommunicator *commC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
127 EXPECT_NE(errorNo, E_OK);
128 EXPECT_EQ(commC, nullptr);
129
130 /**
131 * @tc.steps: step4. release communicator A and communicator B
132 */
133 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commA);
134 commA = nullptr;
135 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commB);
136 commB = nullptr;
137
138 /**
139 * @tc.steps: step5. alloc communicator D using label A
140 * @tc.expected: step5. alloc return OK.
141 */
142 errorNo = E_OK;
143 ICommunicator *commD = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
144 EXPECT_EQ(errorNo, E_OK);
145 EXPECT_NE(commD, nullptr);
146
147 /**
148 * @tc.steps: step6. release communicator D
149 */
150 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commD);
151 commD = nullptr;
152 }
153
ConnectWaitDisconnect()154 static void ConnectWaitDisconnect()
155 {
156 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
157 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
158 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
159 }
160
161 /**
162 * @tc.name: Online And Offline 001
163 * @tc.desc: Test functionality triggered by physical devices online and offline
164 * @tc.type: FUNC
165 * @tc.require: AR000BVRNS AR000CQE0H
166 * @tc.author: wudongxing
167 */
168 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline001, TestSize.Level1)
169 {
170 /**
171 * @tc.steps: step1. device A alloc communicator AA using label A and register callback
172 * @tc.expected: step1. no callback.
173 */
174 int errorNo = E_OK;
175 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
176 ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
177 OnOfflineDevice onlineForAA;
__anone94f6f3c0202(const std::string &target, bool isConnect) 178 commAA->RegOnConnectCallback([&onlineForAA](const std::string &target, bool isConnect) {
179 HandleConnectChange(onlineForAA, target, isConnect);}, nullptr);
180 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
181
182 /**
183 * @tc.steps: step2. connect device A with device B and then disconnect
184 * @tc.expected: step2. no callback.
185 */
186 ConnectWaitDisconnect();
187 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
188
189 /**
190 * @tc.steps: step3. device B alloc communicator BB using label B and register callback
191 * @tc.expected: step3. no callback.
192 */
193 ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
194 ASSERT_NOT_NULL_AND_ACTIVATE(commBB);
195 OnOfflineDevice onlineForBB;
__anone94f6f3c0302(const std::string &target, bool isConnect) 196 commBB->RegOnConnectCallback([&onlineForBB](const std::string &target, bool isConnect) {
197 HandleConnectChange(onlineForBB, target, isConnect);}, nullptr);
198 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
199 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
200
201 /**
202 * @tc.steps: step4. connect device A with device B and then disconnect
203 * @tc.expected: step4. no callback.
204 */
205 ConnectWaitDisconnect();
206 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
207 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
208
209 /**
210 * @tc.steps: step5. device B alloc communicator BA using label A and register callback
211 * @tc.expected: step5. no callback.
212 */
213 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
214 ASSERT_NOT_NULL_AND_ACTIVATE(commBA);
215 OnOfflineDevice onlineForBA;
__anone94f6f3c0402(const std::string &target, bool isConnect) 216 commBA->RegOnConnectCallback([&onlineForBA](const std::string &target, bool isConnect) {
217 HandleConnectChange(onlineForBA, target, isConnect);}, nullptr);
218 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
219 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
220 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
221
222 /**
223 * @tc.steps: step6. connect device A with device B
224 * @tc.expected: step6. communicator AA has callback of device B online;
225 * communicator BA has callback of device A online;
226 * communicator BB no callback
227 */
228 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
229 std::this_thread::sleep_for(std::chrono::milliseconds(100));
230 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
231 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
232 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
233 EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
234 EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
235
236 /**
237 * @tc.steps: step7. disconnect device A with device B
238 * @tc.expected: step7. communicator AA has callback of device B offline;
239 * communicator BA has callback of device A offline;
240 * communicator BB no callback
241 */
242 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
243 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
244 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
245 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
246 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
247 EXPECT_EQ(onlineForAA.latestOfflineDevice, DEVICE_NAME_B);
248 EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
249
250 // Clean up
251 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
252 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
253 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
254 }
255
256 #define REG_CONNECT_CALLBACK(communicator, online) \
257 { \
258 communicator->RegOnConnectCallback([&online](const std::string &target, bool isConnect) { \
259 HandleConnectChange(online, target, isConnect); \
260 }, nullptr); \
261 }
262
263 #define CONNECT_AND_WAIT(waitTime) \
264 { \
265 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle); \
266 std::this_thread::sleep_for(std::chrono::milliseconds(waitTime)); \
267 }
268
269 /**
270 * @tc.name: Online And Offline 002
271 * @tc.desc: Test functionality triggered by alloc and release communicator
272 * @tc.type: FUNC
273 * @tc.require: AR000BVRNT AR000CQE0I
274 * @tc.author: wudongxing
275 */
276 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline002, TestSize.Level1)
277 {
278 /**
279 * @tc.steps: step1. connect device A with device B
280 */
281 CONNECT_AND_WAIT(200); // Sleep 200 ms
282
283 /**
284 * @tc.steps: step2. device A alloc communicator AA using label A and register callback
285 * @tc.expected: step2. no callback.
286 */
287 int errorNo = E_OK;
288 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
289 ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
290 OnOfflineDevice onlineForAA;
291 REG_CONNECT_CALLBACK(commAA, onlineForAA);
292 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
293
294 /**
295 * @tc.steps: step3. device B alloc communicator BB using label B and register callback
296 * @tc.expected: step3. no callback.
297 */
298 ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
299 ASSERT_NOT_NULL_AND_ACTIVATE(commBB);
300 OnOfflineDevice onlineForBB;
301 REG_CONNECT_CALLBACK(commBB, onlineForBB);
302 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
303 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
304
305 /**
306 * @tc.steps: step4. device B alloc communicator BA using label A and register callback
307 * @tc.expected: step4. communicator AA has callback of device B online;
308 * communicator BA has callback of device A online;
309 * communicator BB no callback.
310 */
311 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
312 ASSERT_NOT_NULL_AND_ACTIVATE(commBA);
313 OnOfflineDevice onlineForBA;
314 REG_CONNECT_CALLBACK(commBA, onlineForBA);
315 std::this_thread::sleep_for(std::chrono::milliseconds(100));
316 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
317 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
318 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
319 EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
320 EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
321
322 /**
323 * @tc.steps: step5. device A alloc communicator AB using label B and register callback
324 * @tc.expected: step5. communicator AB has callback of device B online;
325 * communicator BB has callback of device A online;
326 */
327 ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
328 ASSERT_NOT_NULL_AND_ACTIVATE(commAB);
329 OnOfflineDevice onlineForAB;
330 REG_CONNECT_CALLBACK(commAB, onlineForAB);
331 std::this_thread::sleep_for(std::chrono::milliseconds(100));
332 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
333 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
334 EXPECT_EQ(onlineForAB.latestOnlineDevice, DEVICE_NAME_B);
335 EXPECT_EQ(onlineForBB.latestOnlineDevice, DEVICE_NAME_A);
336
337 /**
338 * @tc.steps: step6. device A release communicator AA
339 * @tc.expected: step6. communicator BA has callback of device A offline;
340 * communicator AB and BB no callback;
341 */
342 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
343 std::this_thread::sleep_for(std::chrono::milliseconds(100));
344 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
345 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
346 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
347 EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
348
349 /**
350 * @tc.steps: step7. device B release communicator BA
351 * @tc.expected: step7. communicator AB and BB no callback;
352 */
353 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
354 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
355 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
356
357 /**
358 * @tc.steps: step8. device B release communicator BB
359 * @tc.expected: step8. communicator AB has callback of device B offline;
360 */
361 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
362 std::this_thread::sleep_for(std::chrono::milliseconds(100));
363 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(0));
364 EXPECT_EQ(onlineForAB.latestOfflineDevice, DEVICE_NAME_B);
365
366 // Clean up
367 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
368 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
369 }
370
TestRemoteRestart()371 void TestRemoteRestart()
372 {
373 /**
374 * @tc.steps: step1. connect device D with device E
375 */
376 EnvHandle envDeviceD;
377 EnvHandle envDeviceE;
378 SetUpEnv(envDeviceD, "DEVICE_D");
379 SetUpEnv(envDeviceE, "DEVICE_E");
380
381 /**
382 * @tc.steps: step2. device D alloc communicator DD using label A and register callback
383 */
384 int errorNo = E_OK;
385 ICommunicator *commDD = envDeviceD.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
386 ASSERT_NOT_NULL_AND_ACTIVATE(commDD);
387 OnOfflineDevice onlineForDD;
388 REG_CONNECT_CALLBACK(commDD, onlineForDD);
389
390 /**
391 * @tc.steps: step3. device E alloc communicator EE using label A and register callback
392 */
393 ICommunicator *commEE = envDeviceE.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
394 ASSERT_NOT_NULL_AND_ACTIVATE(commEE);
395 OnOfflineDevice onlineForEE;
396 REG_CONNECT_CALLBACK(commEE, onlineForEE);
397 /**
398 * @tc.steps: step4. deviceD connect to deviceE
399 * @tc.expected: step4. both communicator has callback;
400 */
401 AdapterStub::ConnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
402 std::this_thread::sleep_for(std::chrono::seconds(1));
403 EXPECT_EQ(onlineForDD.onlineDevices.size(), static_cast<size_t>(1));
404 EXPECT_EQ(onlineForEE.onlineDevices.size(), static_cast<size_t>(1));
405
406 /**
407 * @tc.steps: step5. device E restart
408 */
409 envDeviceE.commAggrHandle->ReleaseCommunicator(commEE);
410 std::this_thread::sleep_for(std::chrono::seconds(1));
411 TearDownEnv(envDeviceE);
412 SetUpEnv(envDeviceE, "DEVICE_E");
413
414 commEE = envDeviceE.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
415 ASSERT_NOT_NULL_AND_ACTIVATE(commEE);
416 REG_CONNECT_CALLBACK(commEE, onlineForEE);
417 onlineForEE.onlineDevices.clear();
418 /**
419 * @tc.steps: step6. deviceD connect to deviceE again
420 * @tc.expected: step6. communicatorE has callback;
421 */
422 AdapterStub::ConnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
423 std::this_thread::sleep_for(std::chrono::seconds(1));
424 EXPECT_EQ(onlineForEE.onlineDevices.size(), static_cast<size_t>(1));
425 // Clean up and disconnect
426 envDeviceD.commAggrHandle->ReleaseCommunicator(commDD);
427 envDeviceE.commAggrHandle->ReleaseCommunicator(commEE);
428 std::this_thread::sleep_for(std::chrono::seconds(1));
429
430 AdapterStub::DisconnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
431 TearDownEnv(envDeviceD);
432 TearDownEnv(envDeviceE);
433 }
434 /**
435 * @tc.name: Online And Offline 003
436 * @tc.desc: Test functionality triggered by remote restart
437 * @tc.type: FUNC
438 * @tc.require: AR000CQE0I
439 * @tc.author: zhangqiquan
440 */
441 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline003, TestSize.Level1)
442 {
443 TestRemoteRestart();
444 }
445
446 /**
447 * @tc.name: Online And Offline 004
448 * @tc.desc: Test functionality triggered by remote restart with thread pool
449 * @tc.type: FUNC
450 * @tc.require: AR000CQE0I
451 * @tc.author: zhangqiquan
452 */
453 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline004, TestSize.Level1)
454 {
455 auto threadPool = std::make_shared<ThreadPoolTestStub>();
456 RuntimeContext::GetInstance()->SetThreadPool(threadPool);
457 TestRemoteRestart();
458 RuntimeContext::GetInstance()->SetThreadPool(nullptr);
459 }
460
461 /**
462 * @tc.name: Report Device Connect Change 001
463 * @tc.desc: Test CommunicatorAggregator support report device connect change event
464 * @tc.type: FUNC
465 * @tc.require: AR000DR9KV
466 * @tc.author: xiaozhenjian
467 */
468 HWTEST_F(DistributedDBCommunicatorTest, ReportDeviceConnectChange001, TestSize.Level1)
469 {
470 /**
471 * @tc.steps: step1. device A and device B register connect callback to CommunicatorAggregator
472 */
473 OnOfflineDevice onlineForA;
474 int errCode = g_envDeviceA.commAggrHandle->RegOnConnectCallback(
__anone94f6f3c0502(const std::string &target, bool isConnect) 475 [&onlineForA](const std::string &target, bool isConnect) {
476 HandleConnectChange(onlineForA, target, isConnect);
477 }, nullptr);
478 EXPECT_EQ(errCode, E_OK);
479 OnOfflineDevice onlineForB;
480 errCode = g_envDeviceB.commAggrHandle->RegOnConnectCallback(
__anone94f6f3c0602(const std::string &target, bool isConnect) 481 [&onlineForB](const std::string &target, bool isConnect) {
482 HandleConnectChange(onlineForB, target, isConnect);
483 }, nullptr);
484 EXPECT_EQ(errCode, E_OK);
485
486 /**
487 * @tc.steps: step2. connect device A with device B
488 * @tc.expected: step2. device A callback B online; device B callback A online;
489 */
490 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
491 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
492 EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(1));
493 EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(1));
494 EXPECT_EQ(onlineForA.latestOnlineDevice, DEVICE_NAME_B);
495 EXPECT_EQ(onlineForB.latestOnlineDevice, DEVICE_NAME_A);
496
497 /**
498 * @tc.steps: step3. connect device A with device B
499 * @tc.expected: step3. device A callback B offline; device B callback A offline;
500 */
501 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
502 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
503 EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(0));
504 EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(0));
505 EXPECT_EQ(onlineForA.latestOfflineDevice, DEVICE_NAME_B);
506 EXPECT_EQ(onlineForB.latestOfflineDevice, DEVICE_NAME_A);
507
508 // Clean up
509 g_envDeviceA.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
510 g_envDeviceB.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
511 }
512
513 namespace {
ToLabelType(uint64_t commLabel)514 LabelType ToLabelType(uint64_t commLabel)
515 {
516 uint64_t netOrderLabel = HostToNet(commLabel);
517 uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
518 std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
519 for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
520 realLabel[i] = eachByte[i];
521 }
522 return realLabel;
523 }
524 }
525
526 /**
527 * @tc.name: Report Communicator Not Found 001
528 * @tc.desc: Test CommunicatorAggregator support report communicator not found event
529 * @tc.type: FUNC
530 * @tc.require: AR000DR9KV
531 * @tc.author: xiaozhenjian
532 */
533 HWTEST_F(DistributedDBCommunicatorTest, ReportCommunicatorNotFound001, TestSize.Level1)
534 {
535 /**
536 * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
537 */
538 std::vector<LabelType> lackLabels;
539 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anone94f6f3c0802(const LabelType &commLabel, const std::string &userId)540 [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
541 lackLabels.push_back(commLabel);
542 return -E_NOT_FOUND;
543 }, nullptr);
544 EXPECT_EQ(errCode, E_OK);
545
546 /**
547 * @tc.steps: step2. connect device A with device B
548 */
549 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
550 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
551
552 /**
553 * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
554 * @tc.expected: step3. device B callback that label A not found.
555 */
556 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
557 ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
558 Message *msgForAA = BuildRegedTinyMessage();
559 ASSERT_NE(msgForAA, nullptr);
560 SendConfig conf = {true, false, 0};
561 errCode = commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
562 EXPECT_EQ(errCode, E_OK);
563 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
564 ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
565 EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
566
567 /**
568 * @tc.steps: step4. device B alloc communicator BA using label A and register message callback
569 * @tc.expected: step4. communicator BA will not receive message.
570 */
571 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
572 ASSERT_NE(commBA, nullptr);
573 Message *recvMsgForBA = nullptr;
__anone94f6f3c0902(const std::string &srcTarget, Message *inMsg) 574 commBA->RegOnMessageCallback([&recvMsgForBA](const std::string &srcTarget, Message *inMsg) {
575 recvMsgForBA = inMsg;
576 }, nullptr);
577 commBA->Activate();
578 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
579 EXPECT_EQ(recvMsgForBA, nullptr);
580
581 // Clean up
582 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
583 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
584 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
585 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
586 }
587
588 #define DO_SEND_MESSAGE(src, dst, label, session) \
589 { \
590 Message *msgFor##src##label = BuildRegedTinyMessage(); \
591 ASSERT_NE(msgFor##src##label, nullptr); \
592 msgFor##src##label->SetSessionId(session); \
593 SendConfig conf = {true, false, 0}; \
594 errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
595 EXPECT_EQ(errCode, E_OK); \
596 }
597
598 #define DO_SEND_GIANT_MESSAGE(src, dst, label, size) \
599 { \
600 Message *msgFor##src##label = BuildRegedGiantMessage(size); \
601 ASSERT_NE(msgFor##src##label, nullptr); \
602 SendConfig conf = {false, false, 0}; \
603 errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
604 EXPECT_EQ(errCode, E_OK); \
605 }
606
607 #define ALLOC_AND_SEND_MESSAGE(src, dst, label, session) \
608 ICommunicator *comm##src##label = g_envDevice##src.commAggrHandle->AllocCommunicator(LABEL_##label, errCode); \
609 ASSERT_NOT_NULL_AND_ACTIVATE(comm##src##label); \
610 DO_SEND_MESSAGE(src, dst, label, session)
611
612 #define REG_MESSAGE_CALLBACK(src, label) \
613 string srcTargetFor##src##label; \
614 Message *recvMsgFor##src##label = nullptr; \
615 comm##src##label->RegOnMessageCallback( \
616 [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, Message *inMsg) { \
617 srcTargetFor##src##label = srcTarget; \
618 recvMsgFor##src##label = inMsg; \
619 }, nullptr);
620
621 /**
622 * @tc.name: ReDeliver Message 001
623 * @tc.desc: Test CommunicatorAggregator support redeliver message
624 * @tc.type: FUNC
625 * @tc.require: AR000DR9KV
626 * @tc.author: xiaozhenjian
627 */
628 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage001, TestSize.Level1)
629 {
630 /**
631 * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
632 */
633 std::vector<LabelType> lackLabels;
634 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anone94f6f3c0a02(const LabelType &commLabel, const std::string &userId)635 [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
636 lackLabels.push_back(commLabel);
637 return E_OK;
638 }, nullptr);
639 EXPECT_EQ(errCode, E_OK);
640
641 /**
642 * @tc.steps: step2. connect device A with device B
643 */
644 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
645 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
646
647 /**
648 * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
649 * @tc.expected: step3. device B callback that label A not found.
650 */
651 ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
652 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
653 ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
654 EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
655
656 /**
657 * @tc.steps: step4. device A alloc communicator AB using label B and send message to B
658 * @tc.expected: step4. device B callback that label B not found.
659 */
660 ALLOC_AND_SEND_MESSAGE(A, B, B, 200); // session id 200
661 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
662 ASSERT_EQ(lackLabels.size(), static_cast<size_t>(2));
663 EXPECT_EQ(lackLabels[1], ToLabelType(LABEL_B)); // 1 for second element
664
665 /**
666 * @tc.steps: step5. device B alloc communicator BA using label A and register message callback
667 * @tc.expected: step5. communicator BA will receive message.
668 */
669 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
670 ASSERT_NE(commBA, nullptr);
671 REG_MESSAGE_CALLBACK(B, A);
672 commBA->Activate();
673 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
674 EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
675 ASSERT_NE(recvMsgForBA, nullptr);
676 EXPECT_EQ(recvMsgForBA->GetSessionId(), 100U); // session id 100
677 delete recvMsgForBA;
678 recvMsgForBA = nullptr;
679
680 /**
681 * @tc.steps: step6. device B alloc communicator BB using label B and register message callback
682 * @tc.expected: step6. communicator BB will receive message.
683 */
684 ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
685 ASSERT_NE(commBB, nullptr);
686 REG_MESSAGE_CALLBACK(B, B);
687 commBB->Activate();
688 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
689 EXPECT_EQ(srcTargetForBB, DEVICE_NAME_A);
690 ASSERT_NE(recvMsgForBB, nullptr);
691 EXPECT_EQ(recvMsgForBB->GetSessionId(), 200U); // session id 200
692 delete recvMsgForBB;
693 recvMsgForBB = nullptr;
694
695 // Clean up
696 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
697 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
698 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
699 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
700 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
701 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
702 }
703
704 /**
705 * @tc.name: ReDeliver Message 002
706 * @tc.desc: Test CommunicatorAggregator support redeliver message by order
707 * @tc.type: FUNC
708 * @tc.require: AR000DR9KV
709 * @tc.author: xiaozhenjian
710 */
711 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage002, TestSize.Level1)
712 {
713 /**
714 * @tc.steps: step1. device C create CommunicatorAggregator and initialize
715 */
716 bool step1 = SetUpEnv(g_envDeviceC, DEVICE_NAME_C);
717 ASSERT_EQ(step1, true);
718
719 /**
720 * @tc.steps: step2. device B register communicator not found callback to CommunicatorAggregator
721 */
722 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anone94f6f3c0b02(const LabelType &commLabel, const std::string &userId)723 const std::string &userId)->int {
724 return E_OK;
725 }, nullptr);
726 EXPECT_EQ(errCode, E_OK);
727
728 /**
729 * @tc.steps: step3. connect device A with device B, then device B with device C
730 */
731 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
732 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
733 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
734
735 /**
736 * @tc.steps: step4. device A alloc communicator AA using label A and send message to B
737 */
738 ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
739 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
740
741 /**
742 * @tc.steps: step5. device C alloc communicator CA using label A and send message to B
743 */
744 ALLOC_AND_SEND_MESSAGE(C, B, A, 200); // session id 200
745 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
746 DO_SEND_MESSAGE(A, B, A, 300); // session id 300
747 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
748 DO_SEND_MESSAGE(C, B, A, 400); // session id 400
749 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
750
751 /**
752 * @tc.steps: step6. device B alloc communicator BA using label A and register message callback
753 * @tc.expected: step6. communicator BA will receive message in order of sessionid 100, 200, 300, 400.
754 */
755 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
756 ASSERT_NE(commBA, nullptr);
757 std::vector<std::pair<std::string, Message *>> msgCallbackForBA;
__anone94f6f3c0c02(const std::string &srcTarget, Message *inMsg) 758 commBA->RegOnMessageCallback([&msgCallbackForBA](const std::string &srcTarget, Message *inMsg) {
759 msgCallbackForBA.push_back({srcTarget, inMsg});
760 }, nullptr);
761 commBA->Activate();
762 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
763 ASSERT_EQ(msgCallbackForBA.size(), static_cast<size_t>(4)); // total 4 callback
764 EXPECT_EQ(msgCallbackForBA[0].first, DEVICE_NAME_A); // the 0 order element
765 EXPECT_EQ(msgCallbackForBA[1].first, DEVICE_NAME_C); // the 1 order element
766 EXPECT_EQ(msgCallbackForBA[2].first, DEVICE_NAME_A); // the 2 order element
767 EXPECT_EQ(msgCallbackForBA[3].first, DEVICE_NAME_C); // the 3 order element
768 for (uint32_t i = 0; i < msgCallbackForBA.size(); i++) {
769 EXPECT_EQ(msgCallbackForBA[i].second->GetSessionId(), static_cast<uint32_t>((i + 1) * 100)); // 1 sessionid 100
770 delete msgCallbackForBA[i].second;
771 msgCallbackForBA[i].second = nullptr;
772 }
773
774 // Clean up
775 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
776 g_envDeviceC.commAggrHandle->ReleaseCommunicator(commCA);
777 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
778 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
779 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
780 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
781 TearDownEnv(g_envDeviceC);
782 }
783
784 /**
785 * @tc.name: ReDeliver Message 003
786 * @tc.desc: For observe memory in unusual scenario
787 * @tc.type: FUNC
788 * @tc.require: AR000DR9KV
789 * @tc.author: xiaozhenjian
790 */
791 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage003, TestSize.Level2)
792 {
793 /**
794 * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
795 */
796 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anone94f6f3c0d02(const LabelType &commLabel, const std::string &userId)797 const std::string &userId)->int {
798 return E_OK;
799 }, nullptr);
800 EXPECT_EQ(errCode, E_OK);
801
802 /**
803 * @tc.steps: step2. connect device A with device B
804 */
805 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
806 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
807
808 /**
809 * @tc.steps: step3. device A alloc communicator AA,AB,AC using label A,B,C
810 */
811 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
812 ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
813 ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
814 ASSERT_NOT_NULL_AND_ACTIVATE(commAB);
815 ICommunicator *commAC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_C, errCode);
816 ASSERT_NOT_NULL_AND_ACTIVATE(commAC);
817 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
818
819 /**
820 * @tc.steps: step4. device A Continuously send tiny message to B using communicator AA,AB,AC
821 */
822 for (int turn = 0; turn < 11; turn++) { // Total 11 turns
823 DO_SEND_MESSAGE(A, B, A, 0);
824 DO_SEND_MESSAGE(A, B, B, 0);
825 DO_SEND_MESSAGE(A, B, C, 0);
826 }
827
828 /**
829 * @tc.steps: step5. device A Continuously send giant message to B using communicator AA,AB,AC
830 */
831 for (int turn = 0; turn < 5; turn++) { // Total 5 turns
832 DO_SEND_GIANT_MESSAGE(A, B, A, (3 * 1024 * 1024)); // 3 MBytes, 1024 is scale
833 DO_SEND_GIANT_MESSAGE(A, B, B, (6 * 1024 * 1024)); // 6 MBytes, 1024 is scale
834 DO_SEND_GIANT_MESSAGE(A, B, C, (7 * 1024 * 1024)); // 7 MBytes, 1024 is scale
835 }
836 DO_SEND_GIANT_MESSAGE(A, B, A, (30 * 1024 * 1024)); // 30 MBytes, 1024 is scale
837
838 /**
839 * @tc.steps: step6. wait a long time then send last frame
840 */
841 for (int sec = 0; sec < 15; sec++) { // Total 15 s
842 std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
843 LOGI("[UT][Test][ReDeliverMessage003] Sleep and wait=%d.", sec);
844 }
845 DO_SEND_MESSAGE(A, B, A, 0);
846 std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
847
848 // Clean up
849 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
850 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
851 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAC);
852 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
853 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
854 }
855 }