• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 #include <new>
18 #include <thread>
19 #include "db_errno.h"
20 #include "distributeddb_communicator_common.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "log_print.h"
23 #include "message.h"
24 #include "serial_buffer.h"
25 
26 using namespace std;
27 using namespace testing::ext;
28 using namespace DistributedDB;
29 
30 namespace {
31     EnvHandle g_envDeviceA;
32     EnvHandle g_envDeviceB;
33     EnvHandle g_envDeviceC;
34     ICommunicator *g_commAA = nullptr;
35     ICommunicator *g_commAB = nullptr;
36     ICommunicator *g_commBB = nullptr;
37     ICommunicator *g_commBC = nullptr;
38     ICommunicator *g_commCC = nullptr;
39     ICommunicator *g_commCA = nullptr;
40 }
41 
42 class DistributedDBCommunicatorDeepTest : public testing::Test {
43 public:
44     static void SetUpTestCase(void);
45     static void TearDownTestCase(void);
46     void SetUp();
47     void TearDown();
48 };
49 
SetUpTestCase(void)50 void DistributedDBCommunicatorDeepTest::SetUpTestCase(void)
51 {
52     /**
53      * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
54      */
55     LOGI("[UT][DeepTest][SetUpTestCase] Enter.");
56     bool errCode = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
57     ASSERT_EQ(errCode, true);
58     errCode = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
59     ASSERT_EQ(errCode, true);
60     errCode = SetUpEnv(g_envDeviceC, DEVICE_NAME_C);
61     ASSERT_EQ(errCode, true);
62     DoRegTransformFunction();
63     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
64 }
65 
TearDownTestCase(void)66 void DistributedDBCommunicatorDeepTest::TearDownTestCase(void)
67 {
68     /**
69      * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
70      */
71     LOGI("[UT][DeepTest][TearDownTestCase] Enter.");
72     std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
73     TearDownEnv(g_envDeviceA);
74     TearDownEnv(g_envDeviceB);
75     TearDownEnv(g_envDeviceC);
76     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
77 }
78 
79 namespace {
AllocAllCommunicator()80 void AllocAllCommunicator()
81 {
82     int errorNo = E_OK;
83     g_commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
84     ASSERT_NOT_NULL_AND_ACTIVATE(g_commAA);
85     g_commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
86     ASSERT_NOT_NULL_AND_ACTIVATE(g_commAB);
87     g_commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
88     ASSERT_NOT_NULL_AND_ACTIVATE(g_commBB);
89     g_commBC = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_C, errorNo);
90     ASSERT_NOT_NULL_AND_ACTIVATE(g_commBC);
91     g_commCC = g_envDeviceC.commAggrHandle->AllocCommunicator(LABEL_C, errorNo);
92     ASSERT_NOT_NULL_AND_ACTIVATE(g_commCC);
93     g_commCA = g_envDeviceC.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
94     ASSERT_NOT_NULL_AND_ACTIVATE(g_commCA);
95 }
96 
ReleaseAllCommunicator()97 void ReleaseAllCommunicator()
98 {
99     g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAA);
100     g_commAA = nullptr;
101     g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAB);
102     g_commAB = nullptr;
103     g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBB);
104     g_commBB = nullptr;
105     g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBC);
106     g_commBC = nullptr;
107     g_envDeviceC.commAggrHandle->ReleaseCommunicator(g_commCC);
108     g_commCC = nullptr;
109     g_envDeviceC.commAggrHandle->ReleaseCommunicator(g_commCA);
110     g_commCA = nullptr;
111 }
112 }
113 
SetUp()114 void DistributedDBCommunicatorDeepTest::SetUp()
115 {
116     DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
117     /**
118      * @tc.setup: Alloc communicator AA, AB, BB, BC, CC, CA
119      */
120     AllocAllCommunicator();
121 }
122 
TearDown()123 void DistributedDBCommunicatorDeepTest::TearDown()
124 {
125     /**
126      * @tc.teardown: Release communicator AA, AB, BB, BC, CC, CA
127      */
128     ReleaseAllCommunicator();
129     std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure all thread quiet
130 }
131 
132 /**
133  * @tc.name: WaitAndRetrySend 001
134  * @tc.desc: Test send retry semantic
135  * @tc.type: FUNC
136  * @tc.require: AR000BVDGI AR000CQE0M
137  * @tc.author: xiaozhenjian
138  */
139 HWTEST_F(DistributedDBCommunicatorDeepTest, WaitAndRetrySend001, TestSize.Level2)
140 {
141     // Preset
142     Message *msgForBB = nullptr;
__anona787b8db0302(const std::string &srcTarget, Message *inMsg) 143     g_commBB->RegOnMessageCallback([&msgForBB](const std::string &srcTarget, Message *inMsg) {
144         msgForBB = inMsg;
145     }, nullptr);
146 
147     /**
148      * @tc.steps: step1. connect device A with device B
149      */
150     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
151     std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure quiet
152 
153     /**
154      * @tc.steps: step2. device A simulate send retry
155      */
156     g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
157 
158     /**
159      * @tc.steps: step3. device A send message to device B using communicator AB
160      * @tc.expected: step3. communicator BB received no message
161      */
162     Message *msgForAB = BuildRegedTinyMessage();
163     ASSERT_NE(msgForAB, nullptr);
164     SendConfig conf = {true, false, 0};
165     int errCode = g_commAB->SendMessage(DEVICE_NAME_B, msgForAB, conf);
166     EXPECT_EQ(errCode, E_OK);
167     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
168     EXPECT_EQ(msgForBB, nullptr);
169 
170     /**
171      * @tc.steps: step4. device A simulate sendable feedback
172      * @tc.expected: step4. communicator BB received the message
173      */
174     g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B);
175     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
176     EXPECT_NE(msgForBB, nullptr);
177     delete msgForBB;
178     msgForBB = nullptr;
179 
180     // CleanUp
181     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
182 }
183 
CreateBufferThenAddIntoScheduler(SendTaskScheduler & scheduler,const std::string & dstTarget,Priority inPrio)184 static int CreateBufferThenAddIntoScheduler(SendTaskScheduler &scheduler, const std::string &dstTarget, Priority inPrio)
185 {
186     SerialBuffer *eachBuff = new (std::nothrow) SerialBuffer();
187     if (eachBuff == nullptr) {
188         return -E_OUT_OF_MEMORY;
189     }
190     int errCode = eachBuff->AllocBufferByTotalLength(100, 0); // 100 totallen without header
191     if (errCode != E_OK) {
192         delete eachBuff;
193         eachBuff = nullptr;
194         return errCode;
195     }
196     SendTask task{eachBuff, dstTarget};
197     errCode = scheduler.AddSendTaskIntoSchedule(task, inPrio);
198     if (errCode != E_OK) {
199         delete eachBuff;
200         eachBuff = nullptr;
201         return errCode;
202     }
203     return E_OK;
204 }
205 
206 /**
207  * @tc.name: SendSchedule 001
208  * @tc.desc: Test schedule in Priority order than in send order
209  * @tc.type: FUNC
210  * @tc.require: AR000BVDGI AR000CQE0M
211  * @tc.author: xiaozhenjian
212  */
213 HWTEST_F(DistributedDBCommunicatorDeepTest, SendSchedule001, TestSize.Level2)
214 {
215     // Preset
216     SendTaskScheduler scheduler;
217     scheduler.Initialize();
218 
219     /**
220      * @tc.steps: step1. Add low priority target A buffer to schecduler
221      */
222     int errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_A, Priority::LOW);
223     EXPECT_EQ(errCode, E_OK);
224 
225     /**
226      * @tc.steps: step2. Add low priority target B buffer to schecduler
227      */
228     errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_B, Priority::LOW);
229     EXPECT_EQ(errCode, E_OK);
230 
231     /**
232      * @tc.steps: step3. Add normal priority target B buffer to schecduler
233      */
234     errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_B, Priority::NORMAL);
235     EXPECT_EQ(errCode, E_OK);
236 
237     /**
238      * @tc.steps: step4. Add normal priority target C buffer to schecduler
239      */
240     errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_C, Priority::NORMAL);
241     EXPECT_EQ(errCode, E_OK);
242 
243     /**
244      * @tc.steps: step5. Add high priority target C buffer to schecduler
245      */
246     errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_C, Priority::HIGH);
247     EXPECT_EQ(errCode, E_OK);
248 
249     /**
250      * @tc.steps: step6. Add high priority target A buffer to schecduler
251      */
252     errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_A, Priority::HIGH);
253     EXPECT_EQ(errCode, E_OK);
254 
255     /**
256      * @tc.steps: step7. schedule out buffers one by one
257      * @tc.expected: step7. the order is: high priority target C
258      *                                    high priority target A
259      *                                    normal priority target B
260      *                                    normal priority target C
261      *                                    low priority target A
262      *                                    low priority target B
263      */
264     SendTask outTask;
265     SendTaskInfo outTaskInfo;
266     // high priority target C
267     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo);
268     ASSERT_EQ(errCode, E_OK);
269     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_C);
270     EXPECT_EQ(outTaskInfo.taskPrio, Priority::HIGH);
271     scheduler.FinalizeLastScheduleTask();
272     // high priority target A
273     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo);
274     ASSERT_EQ(errCode, E_OK);
275     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_A);
276     EXPECT_EQ(outTaskInfo.taskPrio, Priority::HIGH);
277     scheduler.FinalizeLastScheduleTask();
278     // normal priority target B
279     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo);
280     ASSERT_EQ(errCode, E_OK);
281     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_B);
282     EXPECT_EQ(outTaskInfo.taskPrio, Priority::NORMAL);
283     scheduler.FinalizeLastScheduleTask();
284     // normal priority target C
285     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo);
286     ASSERT_EQ(errCode, E_OK);
287     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_C);
288     EXPECT_EQ(outTaskInfo.taskPrio, Priority::NORMAL);
289     scheduler.FinalizeLastScheduleTask();
290     // low priority target A
291     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo);
292     ASSERT_EQ(errCode, E_OK);
293     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_A);
294     EXPECT_EQ(outTaskInfo.taskPrio, Priority::LOW);
295     scheduler.FinalizeLastScheduleTask();
296     // low priority target B
297     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo);
298     ASSERT_EQ(errCode, E_OK);
299     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_B);
300     EXPECT_EQ(outTaskInfo.taskPrio, Priority::LOW);
301     scheduler.FinalizeLastScheduleTask();
302 }
303 
304 /**
305  * @tc.name: Fragment 001
306  * @tc.desc: Test fragmentation in send and receive
307  * @tc.type: FUNC
308  * @tc.require: AR000BVDGI AR000CQE0M
309  * @tc.author: xiaozhenjian
310  */
311 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment001, TestSize.Level2)
312 {
313     // Preset
314     Message *recvMsgForBB = nullptr;
__anona787b8db0402(const std::string &srcTarget, Message *inMsg) 315     g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, Message *inMsg) {
316         recvMsgForBB = inMsg;
317     }, nullptr);
318 
319     /**
320      * @tc.steps: step1. connect device A with device B
321      */
322     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
323 
324     /**
325      * @tc.steps: step2. device A send message(registered and giant) to device B using communicator AB
326      * @tc.expected: step2. communicator BB received the message
327      */
328     const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
329     Message *sendMsgForAB = BuildRegedGiantMessage(dataLength);
330     ASSERT_NE(sendMsgForAB, nullptr);
331     SendConfig conf = {false, false, 0};
332     int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsgForAB, conf);
333     EXPECT_EQ(errCode, E_OK);
334     std::this_thread::sleep_for(std::chrono::milliseconds(2600)); // Wait 2600 ms to make sure send done
335     ASSERT_NE(recvMsgForBB, nullptr);
336     ASSERT_EQ(recvMsgForBB->GetMessageId(), REGED_GIANT_MSG_ID);
337 
338     /**
339      * @tc.steps: step3. Compare received data with send data
340      * @tc.expected: step3. equal
341      */
342     Message *oriMsgForAB = BuildRegedGiantMessage(dataLength);
343     ASSERT_NE(oriMsgForAB, nullptr);
344     const RegedGiantObject *oriObjForAB = oriMsgForAB->GetObject<RegedGiantObject>();
345     ASSERT_NE(oriObjForAB, nullptr);
346     const RegedGiantObject *recvObjForBB = recvMsgForBB->GetObject<RegedGiantObject>();
347     ASSERT_NE(recvObjForBB, nullptr);
348     bool isEqual = RegedGiantObject::CheckEqual(*oriObjForAB, *recvObjForBB);
349     EXPECT_EQ(isEqual, true);
350 
351     // CleanUp
352     delete oriMsgForAB;
353     oriMsgForAB = nullptr;
354     delete recvMsgForBB;
355     recvMsgForBB = nullptr;
356     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
357 }
358 
359 /**
360  * @tc.name: Fragment 002
361  * @tc.desc: Test fragmentation in partial loss
362  * @tc.type: FUNC
363  * @tc.require: AR000BVDGI AR000CQE0M
364  * @tc.author: xiaozhenjian
365  */
366 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment002, TestSize.Level2)
367 {
368     // Preset
369     Message *recvMsgForCC = nullptr;
__anona787b8db0502(const std::string &srcTarget, Message *inMsg) 370     g_commCC->RegOnMessageCallback([&recvMsgForCC](const std::string &srcTarget, Message *inMsg) {
371         recvMsgForCC = inMsg;
372     }, nullptr);
373 
374     /**
375      * @tc.steps: step1. connect device B with device C
376      */
377     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
378     std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure quiet
379 
380     /**
381      * @tc.steps: step2. device B simulate partial loss
382      */
383     g_envDeviceB.adapterHandle->SimulateSendPartialLoss();
384 
385     /**
386      * @tc.steps: step3. device B send message(registered and giant) to device C using communicator BC
387      * @tc.expected: step3. communicator CC not receive the message
388      */
389     uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
390     Message *sendMsgForBC = BuildRegedGiantMessage(dataLength);
391     ASSERT_NE(sendMsgForBC, nullptr);
392     SendConfig conf = {false, false, 0};
393     int errCode = g_commBC->SendMessage(DEVICE_NAME_C, sendMsgForBC, conf);
394     EXPECT_EQ(errCode, E_OK);
395     std::this_thread::sleep_for(std::chrono::milliseconds(2600)); // Wait 2600 ms to make sure send done
396     EXPECT_EQ(recvMsgForCC, nullptr);
397 
398     /**
399      * @tc.steps: step4. device B not simulate partial loss
400      */
401     g_envDeviceB.adapterHandle->SimulateSendPartialLossClear();
402 
403     /**
404      * @tc.steps: step5. device B send message(registered and giant) to device C using communicator BC
405      * @tc.expected: step5. communicator CC received the message, the length equal to the one that is second send
406      */
407     dataLength = 17 * 1024 * 1024; // 17 MB, 1024 is scale
408     Message *resendMsgForBC = BuildRegedGiantMessage(dataLength);
409     ASSERT_NE(resendMsgForBC, nullptr);
410     errCode = g_commBC->SendMessage(DEVICE_NAME_C, resendMsgForBC, conf);
411     EXPECT_EQ(errCode, E_OK);
412     std::this_thread::sleep_for(std::chrono::milliseconds(3400)); // Wait 3400 ms to make sure send done
413     ASSERT_NE(recvMsgForCC, nullptr);
414     ASSERT_EQ(recvMsgForCC->GetMessageId(), REGED_GIANT_MSG_ID);
415     const RegedGiantObject *recvObjForCC = recvMsgForCC->GetObject<RegedGiantObject>();
416     ASSERT_NE(recvObjForCC, nullptr);
417     EXPECT_EQ(dataLength, recvObjForCC->rawData_.size());
418 
419     // CleanUp
420     delete recvMsgForCC;
421     recvMsgForCC = nullptr;
422     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
423 }
424 
425 /**
426  * @tc.name: Fragment 003
427  * @tc.desc: Test fragmentation simultaneously
428  * @tc.type: FUNC
429  * @tc.require: AR000BVDGI AR000CQE0M
430  * @tc.author: xiaozhenjian
431  */
432 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment003, TestSize.Level3)
433 {
434     // Preset
435     std::atomic<int> count {0};
__anona787b8db0602(const std::string &srcTarget, Message *inMsg) 436     OnMessageCallback callback = [&count](const std::string &srcTarget, Message *inMsg) {
437         delete inMsg;
438         inMsg = nullptr;
439         count.fetch_add(1, std::memory_order_seq_cst);
440     };
441     g_commBB->RegOnMessageCallback(callback, nullptr);
442     g_commBC->RegOnMessageCallback(callback, nullptr);
443 
444     /**
445      * @tc.steps: step1. connect device A with device B, then device B with device C
446      */
447     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
448     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
449     std::this_thread::sleep_for(std::chrono::milliseconds(400)); // Wait 400 ms to make sure quiet
450 
451     /**
452      * @tc.steps: step2. device A and device C simulate send block
453      */
454     g_envDeviceA.adapterHandle->SimulateSendBlock();
455     g_envDeviceC.adapterHandle->SimulateSendBlock();
456 
457     /**
458      * @tc.steps: step3. device A send message(registered and giant) to device B using communicator AB
459      */
460     uint32_t dataLength = 23 * 1024 * 1024; // 23 MB, 1024 is scale
461     Message *sendMsgForAB = BuildRegedGiantMessage(dataLength);
462     ASSERT_NE(sendMsgForAB, nullptr);
463     SendConfig conf = {false, false, 0};
464     int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsgForAB, conf);
465     EXPECT_EQ(errCode, E_OK);
466 
467     /**
468      * @tc.steps: step4. device C send message(registered and giant) to device B using communicator CC
469      */
470     Message *sendMsgForCC = BuildRegedGiantMessage(dataLength);
471     ASSERT_NE(sendMsgForCC, nullptr);
472     errCode = g_commCC->SendMessage(DEVICE_NAME_B, sendMsgForCC, conf);
473     EXPECT_EQ(errCode, E_OK);
474 
475     /**
476      * @tc.steps: step5. device A and device C not simulate send block
477      * @tc.expected: step5. communicator BB and BV received the message
478      */
479     g_envDeviceA.adapterHandle->SimulateSendBlockClear();
480     g_envDeviceC.adapterHandle->SimulateSendBlockClear();
481     std::this_thread::sleep_for(std::chrono::milliseconds(9200)); // Wait 9200 ms to make sure send done
482     EXPECT_EQ(count, 2); // 2 combined message received
483 
484     // CleanUp
485     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
486     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
487 }
488 
489 namespace {
ClearPreviousTestCaseInfluence()490 void ClearPreviousTestCaseInfluence()
491 {
492     ReleaseAllCommunicator();
493     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
494     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
495     AdapterStub::ConnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
496     std::this_thread::sleep_for(std::chrono::seconds(10)); // Wait 10 s to make sure all thread quiet
497     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
498     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
499     AdapterStub::DisconnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
500     AllocAllCommunicator();
501 }
502 }
503 
504 /**
505  * @tc.name: ReliableOnline 001
506  * @tc.desc: Test device online reliability
507  * @tc.type: FUNC
508  * @tc.require: AR000BVDGJ AR000CQE0N
509  * @tc.author: xiaozhenjian
510  */
511 HWTEST_F(DistributedDBCommunicatorDeepTest, ReliableOnline001, TestSize.Level2)
512 {
513     // Preset
514     ClearPreviousTestCaseInfluence();
515     std::atomic<int> count {0};
__anona787b8db0802(const std::string &target, bool isConnect) 516     OnConnectCallback callback = [&count](const std::string &target, bool isConnect) {
517         if (isConnect) {
518             count.fetch_add(1, std::memory_order_seq_cst);
519         }
520     };
521     g_commAA->RegOnConnectCallback(callback, nullptr);
522     g_commAB->RegOnConnectCallback(callback, nullptr);
523     g_commBB->RegOnConnectCallback(callback, nullptr);
524     g_commBC->RegOnConnectCallback(callback, nullptr);
525     g_commCC->RegOnConnectCallback(callback, nullptr);
526     g_commCA->RegOnConnectCallback(callback, nullptr);
527 
528     /**
529      * @tc.steps: step1. device A and device B and device C simulate send total loss
530      */
531     g_envDeviceA.adapterHandle->SimulateSendTotalLoss();
532     g_envDeviceB.adapterHandle->SimulateSendTotalLoss();
533     g_envDeviceC.adapterHandle->SimulateSendTotalLoss();
534 
535     /**
536      * @tc.steps: step2. connect device A with device B, device B with device C, device C with device A
537      */
538     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
539     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
540     AdapterStub::ConnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
541 
542     /**
543      * @tc.steps: step3. wait a long time
544      * @tc.expected: step3. no communicator received the online callback
545      */
546     std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure quiet
547     EXPECT_EQ(count, 0); // no online callback received
548 
549     /**
550      * @tc.steps: step4. device A and device B and device C not simulate send total loss
551      */
552     g_envDeviceA.adapterHandle->SimulateSendTotalLossClear();
553     g_envDeviceB.adapterHandle->SimulateSendTotalLossClear();
554     g_envDeviceC.adapterHandle->SimulateSendTotalLossClear();
555     std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure send done
556     EXPECT_EQ(count, 6); // 6 online callback received in total
557 
558     // CleanUp
559     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
560     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
561     AdapterStub::DisconnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
562 }
563