1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <new>
18 #include <thread>
19 #include "db_errno.h"
20 #include "distributeddb_communicator_common.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "log_print.h"
23 #include "message.h"
24 #include "serial_buffer.h"
25
26 using namespace std;
27 using namespace testing::ext;
28 using namespace DistributedDB;
29
30 namespace {
31 EnvHandle g_envDeviceA;
32 EnvHandle g_envDeviceB;
33 EnvHandle g_envDeviceC;
34 ICommunicator *g_commAA = nullptr;
35 ICommunicator *g_commAB = nullptr;
36 ICommunicator *g_commBB = nullptr;
37 ICommunicator *g_commBC = nullptr;
38 ICommunicator *g_commCC = nullptr;
39 ICommunicator *g_commCA = nullptr;
40 }
41
42 class DistributedDBCommunicatorDeepTest : public testing::Test {
43 public:
44 static void SetUpTestCase(void);
45 static void TearDownTestCase(void);
46 void SetUp();
47 void TearDown();
48 };
49
SetUpTestCase(void)50 void DistributedDBCommunicatorDeepTest::SetUpTestCase(void)
51 {
52 /**
53 * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
54 */
55 LOGI("[UT][DeepTest][SetUpTestCase] Enter.");
56 bool errCode = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
57 ASSERT_EQ(errCode, true);
58 errCode = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
59 ASSERT_EQ(errCode, true);
60 errCode = SetUpEnv(g_envDeviceC, DEVICE_NAME_C);
61 ASSERT_EQ(errCode, true);
62 DoRegTransformFunction();
63 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
64 }
65
TearDownTestCase(void)66 void DistributedDBCommunicatorDeepTest::TearDownTestCase(void)
67 {
68 /**
69 * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
70 */
71 LOGI("[UT][DeepTest][TearDownTestCase] Enter.");
72 std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
73 TearDownEnv(g_envDeviceA);
74 TearDownEnv(g_envDeviceB);
75 TearDownEnv(g_envDeviceC);
76 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
77 }
78
79 namespace {
AllocAllCommunicator()80 void AllocAllCommunicator()
81 {
82 int errorNo = E_OK;
83 g_commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
84 ASSERT_NOT_NULL_AND_ACTIVATE(g_commAA);
85 g_commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
86 ASSERT_NOT_NULL_AND_ACTIVATE(g_commAB);
87 g_commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
88 ASSERT_NOT_NULL_AND_ACTIVATE(g_commBB);
89 g_commBC = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_C, errorNo);
90 ASSERT_NOT_NULL_AND_ACTIVATE(g_commBC);
91 g_commCC = g_envDeviceC.commAggrHandle->AllocCommunicator(LABEL_C, errorNo);
92 ASSERT_NOT_NULL_AND_ACTIVATE(g_commCC);
93 g_commCA = g_envDeviceC.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
94 ASSERT_NOT_NULL_AND_ACTIVATE(g_commCA);
95 }
96
ReleaseAllCommunicator()97 void ReleaseAllCommunicator()
98 {
99 g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAA);
100 g_commAA = nullptr;
101 g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAB);
102 g_commAB = nullptr;
103 g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBB);
104 g_commBB = nullptr;
105 g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBC);
106 g_commBC = nullptr;
107 g_envDeviceC.commAggrHandle->ReleaseCommunicator(g_commCC);
108 g_commCC = nullptr;
109 g_envDeviceC.commAggrHandle->ReleaseCommunicator(g_commCA);
110 g_commCA = nullptr;
111 }
112 }
113
SetUp()114 void DistributedDBCommunicatorDeepTest::SetUp()
115 {
116 DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
117 /**
118 * @tc.setup: Alloc communicator AA, AB, BB, BC, CC, CA
119 */
120 AllocAllCommunicator();
121 }
122
TearDown()123 void DistributedDBCommunicatorDeepTest::TearDown()
124 {
125 /**
126 * @tc.teardown: Release communicator AA, AB, BB, BC, CC, CA
127 */
128 ReleaseAllCommunicator();
129 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure all thread quiet
130 }
131
132 /**
133 * @tc.name: WaitAndRetrySend 001
134 * @tc.desc: Test send retry semantic
135 * @tc.type: FUNC
136 * @tc.require: AR000BVDGI AR000CQE0M
137 * @tc.author: xiaozhenjian
138 */
139 HWTEST_F(DistributedDBCommunicatorDeepTest, WaitAndRetrySend001, TestSize.Level2)
140 {
141 // Preset
142 Message *msgForBB = nullptr;
__anona787b8db0302(const std::string &srcTarget, Message *inMsg) 143 g_commBB->RegOnMessageCallback([&msgForBB](const std::string &srcTarget, Message *inMsg) {
144 msgForBB = inMsg;
145 }, nullptr);
146
147 /**
148 * @tc.steps: step1. connect device A with device B
149 */
150 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
151 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure quiet
152
153 /**
154 * @tc.steps: step2. device A simulate send retry
155 */
156 g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
157
158 /**
159 * @tc.steps: step3. device A send message to device B using communicator AB
160 * @tc.expected: step3. communicator BB received no message
161 */
162 Message *msgForAB = BuildRegedTinyMessage();
163 ASSERT_NE(msgForAB, nullptr);
164 SendConfig conf = {true, false, 0};
165 int errCode = g_commAB->SendMessage(DEVICE_NAME_B, msgForAB, conf);
166 EXPECT_EQ(errCode, E_OK);
167 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
168 EXPECT_EQ(msgForBB, nullptr);
169
170 /**
171 * @tc.steps: step4. device A simulate sendable feedback
172 * @tc.expected: step4. communicator BB received the message
173 */
174 g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B);
175 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
176 EXPECT_NE(msgForBB, nullptr);
177 delete msgForBB;
178 msgForBB = nullptr;
179
180 // CleanUp
181 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
182 }
183
CreateBufferThenAddIntoScheduler(SendTaskScheduler & scheduler,const std::string & dstTarget,Priority inPrio)184 static int CreateBufferThenAddIntoScheduler(SendTaskScheduler &scheduler, const std::string &dstTarget, Priority inPrio)
185 {
186 SerialBuffer *eachBuff = new (std::nothrow) SerialBuffer();
187 if (eachBuff == nullptr) {
188 return -E_OUT_OF_MEMORY;
189 }
190 int errCode = eachBuff->AllocBufferByTotalLength(100, 0); // 100 totallen without header
191 if (errCode != E_OK) {
192 delete eachBuff;
193 eachBuff = nullptr;
194 return errCode;
195 }
196 SendTask task{eachBuff, dstTarget};
197 errCode = scheduler.AddSendTaskIntoSchedule(task, inPrio);
198 if (errCode != E_OK) {
199 delete eachBuff;
200 eachBuff = nullptr;
201 return errCode;
202 }
203 return E_OK;
204 }
205
206 /**
207 * @tc.name: SendSchedule 001
208 * @tc.desc: Test schedule in Priority order than in send order
209 * @tc.type: FUNC
210 * @tc.require: AR000BVDGI AR000CQE0M
211 * @tc.author: xiaozhenjian
212 */
213 HWTEST_F(DistributedDBCommunicatorDeepTest, SendSchedule001, TestSize.Level2)
214 {
215 // Preset
216 SendTaskScheduler scheduler;
217 scheduler.Initialize();
218
219 /**
220 * @tc.steps: step1. Add low priority target A buffer to schecduler
221 */
222 int errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_A, Priority::LOW);
223 EXPECT_EQ(errCode, E_OK);
224
225 /**
226 * @tc.steps: step2. Add low priority target B buffer to schecduler
227 */
228 errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_B, Priority::LOW);
229 EXPECT_EQ(errCode, E_OK);
230
231 /**
232 * @tc.steps: step3. Add normal priority target B buffer to schecduler
233 */
234 errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_B, Priority::NORMAL);
235 EXPECT_EQ(errCode, E_OK);
236
237 /**
238 * @tc.steps: step4. Add normal priority target C buffer to schecduler
239 */
240 errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_C, Priority::NORMAL);
241 EXPECT_EQ(errCode, E_OK);
242
243 /**
244 * @tc.steps: step5. Add high priority target C buffer to schecduler
245 */
246 errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_C, Priority::HIGH);
247 EXPECT_EQ(errCode, E_OK);
248
249 /**
250 * @tc.steps: step6. Add high priority target A buffer to schecduler
251 */
252 errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_A, Priority::HIGH);
253 EXPECT_EQ(errCode, E_OK);
254
255 /**
256 * @tc.steps: step7. schedule out buffers one by one
257 * @tc.expected: step7. the order is: high priority target C
258 * high priority target A
259 * normal priority target B
260 * normal priority target C
261 * low priority target A
262 * low priority target B
263 */
264 SendTask outTask;
265 SendTaskInfo outTaskInfo;
266 // high priority target C
267 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo);
268 ASSERT_EQ(errCode, E_OK);
269 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_C);
270 EXPECT_EQ(outTaskInfo.taskPrio, Priority::HIGH);
271 scheduler.FinalizeLastScheduleTask();
272 // high priority target A
273 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo);
274 ASSERT_EQ(errCode, E_OK);
275 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_A);
276 EXPECT_EQ(outTaskInfo.taskPrio, Priority::HIGH);
277 scheduler.FinalizeLastScheduleTask();
278 // normal priority target B
279 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo);
280 ASSERT_EQ(errCode, E_OK);
281 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_B);
282 EXPECT_EQ(outTaskInfo.taskPrio, Priority::NORMAL);
283 scheduler.FinalizeLastScheduleTask();
284 // normal priority target C
285 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo);
286 ASSERT_EQ(errCode, E_OK);
287 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_C);
288 EXPECT_EQ(outTaskInfo.taskPrio, Priority::NORMAL);
289 scheduler.FinalizeLastScheduleTask();
290 // low priority target A
291 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo);
292 ASSERT_EQ(errCode, E_OK);
293 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_A);
294 EXPECT_EQ(outTaskInfo.taskPrio, Priority::LOW);
295 scheduler.FinalizeLastScheduleTask();
296 // low priority target B
297 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo);
298 ASSERT_EQ(errCode, E_OK);
299 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_B);
300 EXPECT_EQ(outTaskInfo.taskPrio, Priority::LOW);
301 scheduler.FinalizeLastScheduleTask();
302 }
303
304 /**
305 * @tc.name: Fragment 001
306 * @tc.desc: Test fragmentation in send and receive
307 * @tc.type: FUNC
308 * @tc.require: AR000BVDGI AR000CQE0M
309 * @tc.author: xiaozhenjian
310 */
311 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment001, TestSize.Level2)
312 {
313 // Preset
314 Message *recvMsgForBB = nullptr;
__anona787b8db0402(const std::string &srcTarget, Message *inMsg) 315 g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, Message *inMsg) {
316 recvMsgForBB = inMsg;
317 }, nullptr);
318
319 /**
320 * @tc.steps: step1. connect device A with device B
321 */
322 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
323
324 /**
325 * @tc.steps: step2. device A send message(registered and giant) to device B using communicator AB
326 * @tc.expected: step2. communicator BB received the message
327 */
328 const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
329 Message *sendMsgForAB = BuildRegedGiantMessage(dataLength);
330 ASSERT_NE(sendMsgForAB, nullptr);
331 SendConfig conf = {false, false, 0};
332 int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsgForAB, conf);
333 EXPECT_EQ(errCode, E_OK);
334 std::this_thread::sleep_for(std::chrono::milliseconds(2600)); // Wait 2600 ms to make sure send done
335 ASSERT_NE(recvMsgForBB, nullptr);
336 ASSERT_EQ(recvMsgForBB->GetMessageId(), REGED_GIANT_MSG_ID);
337
338 /**
339 * @tc.steps: step3. Compare received data with send data
340 * @tc.expected: step3. equal
341 */
342 Message *oriMsgForAB = BuildRegedGiantMessage(dataLength);
343 ASSERT_NE(oriMsgForAB, nullptr);
344 const RegedGiantObject *oriObjForAB = oriMsgForAB->GetObject<RegedGiantObject>();
345 ASSERT_NE(oriObjForAB, nullptr);
346 const RegedGiantObject *recvObjForBB = recvMsgForBB->GetObject<RegedGiantObject>();
347 ASSERT_NE(recvObjForBB, nullptr);
348 bool isEqual = RegedGiantObject::CheckEqual(*oriObjForAB, *recvObjForBB);
349 EXPECT_EQ(isEqual, true);
350
351 // CleanUp
352 delete oriMsgForAB;
353 oriMsgForAB = nullptr;
354 delete recvMsgForBB;
355 recvMsgForBB = nullptr;
356 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
357 }
358
359 /**
360 * @tc.name: Fragment 002
361 * @tc.desc: Test fragmentation in partial loss
362 * @tc.type: FUNC
363 * @tc.require: AR000BVDGI AR000CQE0M
364 * @tc.author: xiaozhenjian
365 */
366 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment002, TestSize.Level2)
367 {
368 // Preset
369 Message *recvMsgForCC = nullptr;
__anona787b8db0502(const std::string &srcTarget, Message *inMsg) 370 g_commCC->RegOnMessageCallback([&recvMsgForCC](const std::string &srcTarget, Message *inMsg) {
371 recvMsgForCC = inMsg;
372 }, nullptr);
373
374 /**
375 * @tc.steps: step1. connect device B with device C
376 */
377 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
378 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure quiet
379
380 /**
381 * @tc.steps: step2. device B simulate partial loss
382 */
383 g_envDeviceB.adapterHandle->SimulateSendPartialLoss();
384
385 /**
386 * @tc.steps: step3. device B send message(registered and giant) to device C using communicator BC
387 * @tc.expected: step3. communicator CC not receive the message
388 */
389 uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
390 Message *sendMsgForBC = BuildRegedGiantMessage(dataLength);
391 ASSERT_NE(sendMsgForBC, nullptr);
392 SendConfig conf = {false, false, 0};
393 int errCode = g_commBC->SendMessage(DEVICE_NAME_C, sendMsgForBC, conf);
394 EXPECT_EQ(errCode, E_OK);
395 std::this_thread::sleep_for(std::chrono::milliseconds(2600)); // Wait 2600 ms to make sure send done
396 EXPECT_EQ(recvMsgForCC, nullptr);
397
398 /**
399 * @tc.steps: step4. device B not simulate partial loss
400 */
401 g_envDeviceB.adapterHandle->SimulateSendPartialLossClear();
402
403 /**
404 * @tc.steps: step5. device B send message(registered and giant) to device C using communicator BC
405 * @tc.expected: step5. communicator CC received the message, the length equal to the one that is second send
406 */
407 dataLength = 17 * 1024 * 1024; // 17 MB, 1024 is scale
408 Message *resendMsgForBC = BuildRegedGiantMessage(dataLength);
409 ASSERT_NE(resendMsgForBC, nullptr);
410 errCode = g_commBC->SendMessage(DEVICE_NAME_C, resendMsgForBC, conf);
411 EXPECT_EQ(errCode, E_OK);
412 std::this_thread::sleep_for(std::chrono::milliseconds(3400)); // Wait 3400 ms to make sure send done
413 ASSERT_NE(recvMsgForCC, nullptr);
414 ASSERT_EQ(recvMsgForCC->GetMessageId(), REGED_GIANT_MSG_ID);
415 const RegedGiantObject *recvObjForCC = recvMsgForCC->GetObject<RegedGiantObject>();
416 ASSERT_NE(recvObjForCC, nullptr);
417 EXPECT_EQ(dataLength, recvObjForCC->rawData_.size());
418
419 // CleanUp
420 delete recvMsgForCC;
421 recvMsgForCC = nullptr;
422 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
423 }
424
425 /**
426 * @tc.name: Fragment 003
427 * @tc.desc: Test fragmentation simultaneously
428 * @tc.type: FUNC
429 * @tc.require: AR000BVDGI AR000CQE0M
430 * @tc.author: xiaozhenjian
431 */
432 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment003, TestSize.Level3)
433 {
434 // Preset
435 std::atomic<int> count {0};
__anona787b8db0602(const std::string &srcTarget, Message *inMsg) 436 OnMessageCallback callback = [&count](const std::string &srcTarget, Message *inMsg) {
437 delete inMsg;
438 inMsg = nullptr;
439 count.fetch_add(1, std::memory_order_seq_cst);
440 };
441 g_commBB->RegOnMessageCallback(callback, nullptr);
442 g_commBC->RegOnMessageCallback(callback, nullptr);
443
444 /**
445 * @tc.steps: step1. connect device A with device B, then device B with device C
446 */
447 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
448 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
449 std::this_thread::sleep_for(std::chrono::milliseconds(400)); // Wait 400 ms to make sure quiet
450
451 /**
452 * @tc.steps: step2. device A and device C simulate send block
453 */
454 g_envDeviceA.adapterHandle->SimulateSendBlock();
455 g_envDeviceC.adapterHandle->SimulateSendBlock();
456
457 /**
458 * @tc.steps: step3. device A send message(registered and giant) to device B using communicator AB
459 */
460 uint32_t dataLength = 23 * 1024 * 1024; // 23 MB, 1024 is scale
461 Message *sendMsgForAB = BuildRegedGiantMessage(dataLength);
462 ASSERT_NE(sendMsgForAB, nullptr);
463 SendConfig conf = {false, false, 0};
464 int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsgForAB, conf);
465 EXPECT_EQ(errCode, E_OK);
466
467 /**
468 * @tc.steps: step4. device C send message(registered and giant) to device B using communicator CC
469 */
470 Message *sendMsgForCC = BuildRegedGiantMessage(dataLength);
471 ASSERT_NE(sendMsgForCC, nullptr);
472 errCode = g_commCC->SendMessage(DEVICE_NAME_B, sendMsgForCC, conf);
473 EXPECT_EQ(errCode, E_OK);
474
475 /**
476 * @tc.steps: step5. device A and device C not simulate send block
477 * @tc.expected: step5. communicator BB and BV received the message
478 */
479 g_envDeviceA.adapterHandle->SimulateSendBlockClear();
480 g_envDeviceC.adapterHandle->SimulateSendBlockClear();
481 std::this_thread::sleep_for(std::chrono::milliseconds(9200)); // Wait 9200 ms to make sure send done
482 EXPECT_EQ(count, 2); // 2 combined message received
483
484 // CleanUp
485 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
486 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
487 }
488
489 namespace {
ClearPreviousTestCaseInfluence()490 void ClearPreviousTestCaseInfluence()
491 {
492 ReleaseAllCommunicator();
493 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
494 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
495 AdapterStub::ConnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
496 std::this_thread::sleep_for(std::chrono::seconds(10)); // Wait 10 s to make sure all thread quiet
497 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
498 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
499 AdapterStub::DisconnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
500 AllocAllCommunicator();
501 }
502 }
503
504 /**
505 * @tc.name: ReliableOnline 001
506 * @tc.desc: Test device online reliability
507 * @tc.type: FUNC
508 * @tc.require: AR000BVDGJ AR000CQE0N
509 * @tc.author: xiaozhenjian
510 */
511 HWTEST_F(DistributedDBCommunicatorDeepTest, ReliableOnline001, TestSize.Level2)
512 {
513 // Preset
514 ClearPreviousTestCaseInfluence();
515 std::atomic<int> count {0};
__anona787b8db0802(const std::string &target, bool isConnect) 516 OnConnectCallback callback = [&count](const std::string &target, bool isConnect) {
517 if (isConnect) {
518 count.fetch_add(1, std::memory_order_seq_cst);
519 }
520 };
521 g_commAA->RegOnConnectCallback(callback, nullptr);
522 g_commAB->RegOnConnectCallback(callback, nullptr);
523 g_commBB->RegOnConnectCallback(callback, nullptr);
524 g_commBC->RegOnConnectCallback(callback, nullptr);
525 g_commCC->RegOnConnectCallback(callback, nullptr);
526 g_commCA->RegOnConnectCallback(callback, nullptr);
527
528 /**
529 * @tc.steps: step1. device A and device B and device C simulate send total loss
530 */
531 g_envDeviceA.adapterHandle->SimulateSendTotalLoss();
532 g_envDeviceB.adapterHandle->SimulateSendTotalLoss();
533 g_envDeviceC.adapterHandle->SimulateSendTotalLoss();
534
535 /**
536 * @tc.steps: step2. connect device A with device B, device B with device C, device C with device A
537 */
538 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
539 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
540 AdapterStub::ConnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
541
542 /**
543 * @tc.steps: step3. wait a long time
544 * @tc.expected: step3. no communicator received the online callback
545 */
546 std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure quiet
547 EXPECT_EQ(count, 0); // no online callback received
548
549 /**
550 * @tc.steps: step4. device A and device B and device C not simulate send total loss
551 */
552 g_envDeviceA.adapterHandle->SimulateSendTotalLossClear();
553 g_envDeviceB.adapterHandle->SimulateSendTotalLossClear();
554 g_envDeviceC.adapterHandle->SimulateSendTotalLossClear();
555 std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure send done
556 EXPECT_EQ(count, 6); // 6 online callback received in total
557
558 // CleanUp
559 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
560 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
561 AdapterStub::DisconnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
562 }
563