• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 #include <gmock/gmock.h>
18 #include <new>
19 #include <thread>
20 #include "db_errno.h"
21 #include "distributeddb_communicator_common.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "log_print.h"
24 #include "network_adapter.h"
25 #include "message.h"
26 #include "mock_process_communicator.h"
27 #include "serial_buffer.h"
28 
29 using namespace std;
30 using namespace testing::ext;
31 using namespace DistributedDB;
32 
33 namespace {
34     EnvHandle g_envDeviceA;
35     EnvHandle g_envDeviceB;
36     EnvHandle g_envDeviceC;
37     ICommunicator *g_commAA = nullptr;
38     ICommunicator *g_commAB = nullptr;
39     ICommunicator *g_commBB = nullptr;
40     ICommunicator *g_commBC = nullptr;
41     ICommunicator *g_commCC = nullptr;
42     ICommunicator *g_commCA = nullptr;
43 }
44 
45 class DistributedDBCommunicatorDeepTest : public testing::Test {
46 public:
47     static void SetUpTestCase(void);
48     static void TearDownTestCase(void);
49     void SetUp();
50     void TearDown();
51 };
52 
SetUpTestCase(void)53 void DistributedDBCommunicatorDeepTest::SetUpTestCase(void)
54 {
55     /**
56      * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
57      */
58     LOGI("[UT][DeepTest][SetUpTestCase] Enter.");
59     bool isSuccess = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
60     ASSERT_EQ(isSuccess, true);
61     isSuccess = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
62     ASSERT_EQ(isSuccess, true);
63     isSuccess = SetUpEnv(g_envDeviceC, DEVICE_NAME_C);
64     ASSERT_EQ(isSuccess, true);
65     DoRegTransformFunction();
66     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
67 }
68 
TearDownTestCase(void)69 void DistributedDBCommunicatorDeepTest::TearDownTestCase(void)
70 {
71     /**
72      * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
73      */
74     LOGI("[UT][DeepTest][TearDownTestCase] Enter.");
75     std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
76     TearDownEnv(g_envDeviceA);
77     TearDownEnv(g_envDeviceB);
78     TearDownEnv(g_envDeviceC);
79     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
80 }
81 
82 namespace {
AllocAllCommunicator()83 void AllocAllCommunicator()
84 {
85     int errorNo = E_OK;
86     g_commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
87     ASSERT_NOT_NULL_AND_ACTIVATE(g_commAA, "");
88     g_commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
89     ASSERT_NOT_NULL_AND_ACTIVATE(g_commAB, "");
90     g_commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
91     ASSERT_NOT_NULL_AND_ACTIVATE(g_commBB, "");
92     g_commBC = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_C, errorNo);
93     ASSERT_NOT_NULL_AND_ACTIVATE(g_commBC, "");
94     g_commCC = g_envDeviceC.commAggrHandle->AllocCommunicator(LABEL_C, errorNo);
95     ASSERT_NOT_NULL_AND_ACTIVATE(g_commCC, "");
96     g_commCA = g_envDeviceC.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
97     ASSERT_NOT_NULL_AND_ACTIVATE(g_commCA, "");
98 }
99 
ReleaseAllCommunicator()100 void ReleaseAllCommunicator()
101 {
102     g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAA);
103     g_commAA = nullptr;
104     g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAB);
105     g_commAB = nullptr;
106     g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBB);
107     g_commBB = nullptr;
108     g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBC);
109     g_commBC = nullptr;
110     g_envDeviceC.commAggrHandle->ReleaseCommunicator(g_commCC);
111     g_commCC = nullptr;
112     g_envDeviceC.commAggrHandle->ReleaseCommunicator(g_commCA);
113     g_commCA = nullptr;
114 }
115 }
116 
SetUp()117 void DistributedDBCommunicatorDeepTest::SetUp()
118 {
119     DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
120     /**
121      * @tc.setup: Alloc communicator AA, AB, BB, BC, CC, CA
122      */
123     AllocAllCommunicator();
124 }
125 
TearDown()126 void DistributedDBCommunicatorDeepTest::TearDown()
127 {
128     /**
129      * @tc.teardown: Release communicator AA, AB, BB, BC, CC, CA
130      */
131     ReleaseAllCommunicator();
132     g_envDeviceA.commAggrHandle->ResetRetryCount();
133     g_envDeviceB.commAggrHandle->ResetRetryCount();
134     g_envDeviceC.commAggrHandle->ResetRetryCount();
135     std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure all thread quiet
136 }
137 
138 /**
139  * @tc.name: WaitAndRetrySend 001
140  * @tc.desc: Test send retry semantic
141  * @tc.type: FUNC
142  * @tc.require:
143  * @tc.author: xiaozhenjian
144  */
145 HWTEST_F(DistributedDBCommunicatorDeepTest, WaitAndRetrySend001, TestSize.Level2)
146 {
147     // Preset
148     Message *msgForBB = nullptr;
__anon5d3d689c0302(const std::string &srcTarget, Message *inMsg) 149     g_commBB->RegOnMessageCallback([&msgForBB](const std::string &srcTarget, Message *inMsg) {
150         msgForBB = inMsg;
151         return E_OK;
152     }, nullptr);
153     Message *msgForCA = nullptr;
__anon5d3d689c0402(const std::string &srcTarget, Message *inMsg) 154     g_commCA->RegOnMessageCallback([&msgForCA](const std::string &srcTarget, Message *inMsg) {
155         msgForCA = inMsg;
156         return E_OK;
157     }, nullptr);
158 
159     /**
160      * @tc.steps: step1. connect device A with device B
161      */
162     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
163     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceC.adapterHandle);
164     std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure quiet
165 
166     /**
167      * @tc.steps: step2. device A simulate send retry
168      */
169     g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
170 
171     /**
172      * @tc.steps: step3. device A send message to device B using communicator AB
173      * @tc.expected: step3. communicator BB received no message
174      */
175     Message *msgForAB = BuildRegedTinyMessage();
176     ASSERT_NE(msgForAB, nullptr);
177     SendConfig conf = {true, false, true, 0};
178     int errCode = g_commAB->SendMessage(DEVICE_NAME_B, msgForAB, conf);
179     EXPECT_EQ(errCode, E_OK);
180 
181     Message *msgForAA = BuildRegedTinyMessage();
182     ASSERT_NE(msgForAA, nullptr);
183     errCode = g_commAA->SendMessage(DEVICE_NAME_C, msgForAA, conf);
184     EXPECT_EQ(errCode, E_OK);
185     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
186     EXPECT_EQ(msgForBB, nullptr);
187     EXPECT_NE(msgForCA, nullptr);
188     delete msgForCA;
189     msgForCA = nullptr;
190 
191     /**
192      * @tc.steps: step4. device A simulate sendable feedback
193      * @tc.expected: step4. communicator BB received the message
194      */
195     g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B);
196     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
197     EXPECT_NE(msgForBB, nullptr);
198     delete msgForBB;
199     msgForBB = nullptr;
200 
201     // CleanUp
202     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
203     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceC.adapterHandle);
204 }
205 
206 /**
207  * @tc.name: WaitAndRetrySend002
208  * @tc.desc: Test send return retry but task not retry
209  * @tc.type: FUNC
210  * @tc.require:
211  * @tc.author: liaoyonghuang
212  */
213 HWTEST_F(DistributedDBCommunicatorDeepTest, WaitAndRetrySend002, TestSize.Level2)
214 {
215     // Preset
216     Message *msgForCA = nullptr;
__anon5d3d689c0502(const std::string &srcTarget, Message *inMsg) 217     g_commCA->RegOnMessageCallback([&msgForCA](const std::string &srcTarget, Message *inMsg) {
218         msgForCA = inMsg;
219         return E_OK;
220     }, nullptr);
221 
222     /**
223      * @tc.steps: step1. connect device A with device B
224      */
225     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceC.adapterHandle);
226     std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure quiet
227 
228     /**
229      * @tc.steps: step2. device A simulate send retry
230      */
231     g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
232 
233     /**
234      * @tc.steps: step3. device A send message to device B using communicator AB
235      * @tc.expected: step3. communicator BB received no message
236      */
237     Message *msgForAB = BuildRegedTinyMessage();
238     ASSERT_NE(msgForAB, nullptr);
239     SendConfig conf = {true, false, false, 0};
__anon5d3d689c0602(int, int) 240     OnSendEnd onSendEnd = [](int, int) {
241         LOGI("[WaitAndRetrySend002] on send end.");
242     };
243     int errCode = g_commAB->SendMessage(DEVICE_NAME_B, msgForAB, conf, onSendEnd);
244     EXPECT_EQ(errCode, E_OK);
245 
246     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
247     EXPECT_EQ(msgForCA, nullptr);
248 
249     /**
250      * @tc.steps: step4. device A simulate sendable feedback
251      * @tc.expected: step4. communicator BB received the message
252      */
253     g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B);
254     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
255 
256     // CleanUp
257     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceC.adapterHandle);
258 }
259 
CreateBufferThenAddIntoScheduler(SendTaskScheduler & scheduler,const std::string & dstTarget,Priority inPrio)260 static int CreateBufferThenAddIntoScheduler(SendTaskScheduler &scheduler, const std::string &dstTarget, Priority inPrio)
261 {
262     SerialBuffer *eachBuff = new (std::nothrow) SerialBuffer();
263     if (eachBuff == nullptr) {
264         return -E_OUT_OF_MEMORY;
265     }
266     int errCode = eachBuff->AllocBufferByTotalLength(100, 0); // 100 totallen without header
267     if (errCode != E_OK) {
268         delete eachBuff;
269         eachBuff = nullptr;
270         return errCode;
271     }
272     SendTask task{eachBuff, dstTarget, nullptr, 0u};
273     errCode = scheduler.AddSendTaskIntoSchedule(task, inPrio);
274     if (errCode != E_OK) {
275         delete eachBuff;
276         eachBuff = nullptr;
277         return errCode;
278     }
279     return E_OK;
280 }
281 
282 /**
283  * @tc.name: SendSchedule 001
284  * @tc.desc: Test schedule in Priority order than in send order
285  * @tc.type: FUNC
286  * @tc.require:
287  * @tc.author: xiaozhenjian
288  */
289 HWTEST_F(DistributedDBCommunicatorDeepTest, SendSchedule001, TestSize.Level2)
290 {
291     // Preset
292     SendTaskScheduler scheduler;
293     scheduler.Initialize();
294 
295     /**
296      * @tc.steps: step1. Add low priority target A buffer to schecduler
297      */
298     int errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_A, Priority::LOW);
299     EXPECT_EQ(errCode, E_OK);
300 
301     /**
302      * @tc.steps: step2. Add low priority target B buffer to schecduler
303      */
304     errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_B, Priority::LOW);
305     EXPECT_EQ(errCode, E_OK);
306 
307     /**
308      * @tc.steps: step3. Add normal priority target B buffer to schecduler
309      */
310     errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_B, Priority::NORMAL);
311     EXPECT_EQ(errCode, E_OK);
312 
313     /**
314      * @tc.steps: step4. Add normal priority target C buffer to schecduler
315      */
316     errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_C, Priority::NORMAL);
317     EXPECT_EQ(errCode, E_OK);
318 
319     /**
320      * @tc.steps: step5. Add high priority target C buffer to schecduler
321      */
322     errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_C, Priority::HIGH);
323     EXPECT_EQ(errCode, E_OK);
324 
325     /**
326      * @tc.steps: step6. Add high priority target A buffer to schecduler
327      */
328     errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_A, Priority::HIGH);
329     EXPECT_EQ(errCode, E_OK);
330 
331     /**
332      * @tc.steps: step7. schedule out buffers one by one
333      * @tc.expected: step7. the order is: high priority target C
334      *                                    high priority target A
335      *                                    normal priority target B
336      *                                    normal priority target C
337      *                                    low priority target A
338      *                                    low priority target B
339      */
340     SendTask outTask;
341     SendTaskInfo outTaskInfo;
342     uint32_t totalLength = 0;
343     // high priority target C
344     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
345     ASSERT_EQ(errCode, E_OK);
346     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_C);
347     EXPECT_EQ(outTaskInfo.taskPrio, Priority::HIGH);
348     scheduler.FinalizeLastScheduleTask();
349     // high priority target A
350     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
351     ASSERT_EQ(errCode, E_OK);
352     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_A);
353     EXPECT_EQ(outTaskInfo.taskPrio, Priority::HIGH);
354     scheduler.FinalizeLastScheduleTask();
355     // normal priority target B
356     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
357     ASSERT_EQ(errCode, E_OK);
358     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_B);
359     EXPECT_EQ(outTaskInfo.taskPrio, Priority::NORMAL);
360     scheduler.FinalizeLastScheduleTask();
361     // normal priority target C
362     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
363     ASSERT_EQ(errCode, E_OK);
364     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_C);
365     EXPECT_EQ(outTaskInfo.taskPrio, Priority::NORMAL);
366     scheduler.FinalizeLastScheduleTask();
367     // low priority target A
368     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
369     ASSERT_EQ(errCode, E_OK);
370     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_A);
371     EXPECT_EQ(outTaskInfo.taskPrio, Priority::LOW);
372     scheduler.FinalizeLastScheduleTask();
373     // low priority target B
374     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
375     ASSERT_EQ(errCode, E_OK);
376     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_B);
377     EXPECT_EQ(outTaskInfo.taskPrio, Priority::LOW);
378     scheduler.FinalizeLastScheduleTask();
379 }
380 
381 /**
382  * @tc.name: Fragment 001
383  * @tc.desc: Test fragmentation in send and receive
384  * @tc.type: FUNC
385  * @tc.require:
386  * @tc.author: xiaozhenjian
387  */
388 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment001, TestSize.Level2)
389 {
390     // Preset
391     Message *recvMsgForBB = nullptr;
__anon5d3d689c0702(const std::string &srcTarget, Message *inMsg) 392     g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, Message *inMsg) {
393         recvMsgForBB = inMsg;
394         return E_OK;
395     }, nullptr);
396 
397     /**
398      * @tc.steps: step1. connect device A with device B
399      */
400     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
401 
402     /**
403      * @tc.steps: step2. device A send message(registered and giant) to device B using communicator AB
404      * @tc.expected: step2. communicator BB received the message
405      */
406     const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
407     Message *sendMsgForAB = BuildRegedGiantMessage(dataLength);
408     ASSERT_NE(sendMsgForAB, nullptr);
409     SendConfig conf = {false, false, true, 0};
410     int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsgForAB, conf);
411     EXPECT_EQ(errCode, E_OK);
412     std::this_thread::sleep_for(std::chrono::milliseconds(2600)); // Wait 2600 ms to make sure send done
413     ASSERT_NE(recvMsgForBB, nullptr);
414     ASSERT_EQ(recvMsgForBB->GetMessageId(), REGED_GIANT_MSG_ID);
415 
416     /**
417      * @tc.steps: step3. Compare received data with send data
418      * @tc.expected: step3. equal
419      */
420     Message *oriMsgForAB = BuildRegedGiantMessage(dataLength);
421     ASSERT_NE(oriMsgForAB, nullptr);
422     const RegedGiantObject *oriObjForAB = oriMsgForAB->GetObject<RegedGiantObject>();
423     ASSERT_NE(oriObjForAB, nullptr);
424     const RegedGiantObject *recvObjForBB = recvMsgForBB->GetObject<RegedGiantObject>();
425     ASSERT_NE(recvObjForBB, nullptr);
426     bool isEqual = RegedGiantObject::CheckEqual(*oriObjForAB, *recvObjForBB);
427     EXPECT_EQ(isEqual, true);
428 
429     // CleanUp
430     delete oriMsgForAB;
431     oriMsgForAB = nullptr;
432     delete recvMsgForBB;
433     recvMsgForBB = nullptr;
434     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
435 }
436 
437 /**
438  * @tc.name: Fragment 002
439  * @tc.desc: Test fragmentation in partial loss
440  * @tc.type: FUNC
441  * @tc.require:
442  * @tc.author: xiaozhenjian
443  */
444 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment002, TestSize.Level2)
445 {
446     // Preset
447     Message *recvMsgForCC = nullptr;
__anon5d3d689c0802(const std::string &srcTarget, Message *inMsg) 448     g_commCC->RegOnMessageCallback([&recvMsgForCC](const std::string &srcTarget, Message *inMsg) {
449         recvMsgForCC = inMsg;
450         return E_OK;
451     }, nullptr);
452 
453     /**
454      * @tc.steps: step1. connect device B with device C
455      */
456     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
457     std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure quiet
458 
459     /**
460      * @tc.steps: step2. device B simulate partial loss
461      */
462     g_envDeviceB.adapterHandle->SimulateSendPartialLoss();
463 
464     /**
465      * @tc.steps: step3. device B send message(registered and giant) to device C using communicator BC
466      * @tc.expected: step3. communicator CC not receive the message
467      */
468     uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
469     Message *sendMsgForBC = BuildRegedGiantMessage(dataLength);
470     ASSERT_NE(sendMsgForBC, nullptr);
471     SendConfig conf = {false, false, true, 0};
472     int errCode = g_commBC->SendMessage(DEVICE_NAME_C, sendMsgForBC, conf);
473     EXPECT_EQ(errCode, E_OK);
474     std::this_thread::sleep_for(std::chrono::milliseconds(2600)); // Wait 2600 ms to make sure send done
475     EXPECT_EQ(recvMsgForCC, nullptr);
476 
477     /**
478      * @tc.steps: step4. device B not simulate partial loss
479      */
480     g_envDeviceB.adapterHandle->SimulateSendPartialLossClear();
481 
482     /**
483      * @tc.steps: step5. device B send message(registered and giant) to device C using communicator BC
484      * @tc.expected: step5. communicator CC received the message, the length equal to the one that is second send
485      */
486     dataLength = 17 * 1024 * 1024; // 17 MB, 1024 is scale
487     Message *resendMsgForBC = BuildRegedGiantMessage(dataLength);
488     ASSERT_NE(resendMsgForBC, nullptr);
489     errCode = g_commBC->SendMessage(DEVICE_NAME_C, resendMsgForBC, conf);
490     EXPECT_EQ(errCode, E_OK);
491     std::this_thread::sleep_for(std::chrono::milliseconds(3400)); // Wait 3400 ms to make sure send done
492     ASSERT_NE(recvMsgForCC, nullptr);
493     ASSERT_EQ(recvMsgForCC->GetMessageId(), REGED_GIANT_MSG_ID);
494     const RegedGiantObject *recvObjForCC = recvMsgForCC->GetObject<RegedGiantObject>();
495     ASSERT_NE(recvObjForCC, nullptr);
496     EXPECT_EQ(dataLength, recvObjForCC->rawData_.size());
497 
498     // CleanUp
499     delete recvMsgForCC;
500     recvMsgForCC = nullptr;
501     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
502 }
503 
504 /**
505  * @tc.name: Fragment 003
506  * @tc.desc: Test fragmentation simultaneously
507  * @tc.type: FUNC
508  * @tc.require:
509  * @tc.author: xiaozhenjian
510  */
511 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment003, TestSize.Level3)
512 {
513     // Preset
514     std::atomic<int> count {0};
__anon5d3d689c0902(const std::string &srcTarget, Message *inMsg) 515     OnMessageCallback callback = [&count](const std::string &srcTarget, Message *inMsg) {
516         delete inMsg;
517         inMsg = nullptr;
518         count.fetch_add(1, std::memory_order_seq_cst);
519         return E_OK;
520     };
521     g_commBB->RegOnMessageCallback(callback, nullptr);
522     g_commBC->RegOnMessageCallback(callback, nullptr);
523 
524     /**
525      * @tc.steps: step1. connect device A with device B, then device B with device C
526      */
527     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
528     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
529     std::this_thread::sleep_for(std::chrono::milliseconds(400)); // Wait 400 ms to make sure quiet
530 
531     /**
532      * @tc.steps: step2. device A and device C simulate send block
533      */
534     g_envDeviceA.adapterHandle->SimulateSendBlock();
535     g_envDeviceC.adapterHandle->SimulateSendBlock();
536 
537     /**
538      * @tc.steps: step3. device A send message(registered and giant) to device B using communicator AB
539      */
540     uint32_t dataLength = 23 * 1024 * 1024; // 23 MB, 1024 is scale
541     Message *sendMsgForAB = BuildRegedGiantMessage(dataLength);
542     ASSERT_NE(sendMsgForAB, nullptr);
543     SendConfig conf = {false, false, true, 0};
544     int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsgForAB, conf);
545     EXPECT_EQ(errCode, E_OK);
546 
547     /**
548      * @tc.steps: step4. device C send message(registered and giant) to device B using communicator CC
549      */
550     Message *sendMsgForCC = BuildRegedGiantMessage(dataLength);
551     ASSERT_NE(sendMsgForCC, nullptr);
552     errCode = g_commCC->SendMessage(DEVICE_NAME_B, sendMsgForCC, conf);
553     EXPECT_EQ(errCode, E_OK);
554 
555     /**
556      * @tc.steps: step5. device A and device C not simulate send block
557      * @tc.expected: step5. communicator BB and BV received the message
558      */
559     g_envDeviceA.adapterHandle->SimulateSendBlockClear();
560     g_envDeviceC.adapterHandle->SimulateSendBlockClear();
561     std::this_thread::sleep_for(std::chrono::milliseconds(9200)); // Wait 9200 ms to make sure send done
562     EXPECT_EQ(count, 2); // 2 combined message received
563 
564     // CleanUp
565     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
566     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
567 }
568 
569 /**
570  * @tc.name: Fragment 004
571  * @tc.desc: Test fragmentation in send and receive when rate limit
572  * @tc.type: FUNC
573  * @tc.require:
574  * @tc.author: zhangqiquan
575  */
576 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment004, TestSize.Level2)
577 {
578     /**
579      * @tc.steps: step1. connect device A with device B
580      */
581     Message *recvMsgForBB = nullptr;
__anon5d3d689c0a02(const std::string &srcTarget, Message *inMsg) 582     g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, Message *inMsg) {
583         recvMsgForBB = inMsg;
584         return E_OK;
585     }, nullptr);
586     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
587     std::atomic<int> count = 0;
__anon5d3d689c0b02() 588     g_envDeviceA.adapterHandle->ForkSendBytes([&count]() {
589         count++;
590         if (count % 3 == 0) { // retry each 3 packet
591             return -E_WAIT_RETRY;
592         }
593         return E_OK;
594     });
595     /**
596      * @tc.steps: step2. device A send message(registered and giant) to device B using communicator AB
597      * @tc.expected: step2. communicator BB received the message
598      */
599     const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
600     Message *sendMsg = BuildRegedGiantMessage(dataLength);
601     ASSERT_NE(sendMsg, nullptr);
602     SendConfig conf = {false, false, true, 0};
603     int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsg, conf);
604     EXPECT_EQ(errCode, E_OK);
605     std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait 1s to make sure send done
606     g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
607     g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B);
608     int reTryTimes = 5;
609     while (recvMsgForBB == nullptr && reTryTimes > 0) {
610         std::this_thread::sleep_for(std::chrono::seconds(3));
611         reTryTimes--;
612     }
613     ASSERT_NE(recvMsgForBB, nullptr);
614     ASSERT_EQ(recvMsgForBB->GetMessageId(), REGED_GIANT_MSG_ID);
615     /**
616      * @tc.steps: step3. Compare received data with send data
617      * @tc.expected: step3. equal
618      */
619     Message *oriMsgForAB = BuildRegedGiantMessage(dataLength);
620     ASSERT_NE(oriMsgForAB, nullptr);
621     auto *recvObjForBB = recvMsgForBB->GetObject<RegedGiantObject>();
622     ASSERT_NE(recvObjForBB, nullptr);
623     auto *oriObjForAB = oriMsgForAB->GetObject<RegedGiantObject>();
624     ASSERT_NE(oriObjForAB, nullptr);
625     bool isEqual = RegedGiantObject::CheckEqual(*oriObjForAB, *recvObjForBB);
626     EXPECT_EQ(isEqual, true);
627     g_envDeviceA.adapterHandle->ForkSendBytes(nullptr);
628 
629     // CleanUp
630     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
631     delete oriMsgForAB;
632     oriMsgForAB = nullptr;
633     delete recvMsgForBB;
634     recvMsgForBB = nullptr;
635 }
636 
637 namespace {
ClearPreviousTestCaseInfluence()638 void ClearPreviousTestCaseInfluence()
639 {
640     ReleaseAllCommunicator();
641     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
642     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
643     AdapterStub::ConnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
644     std::this_thread::sleep_for(std::chrono::seconds(10)); // Wait 10 s to make sure all thread quiet
645     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
646     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
647     AdapterStub::DisconnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
648     AllocAllCommunicator();
649 }
650 }
651 
652 /**
653  * @tc.name: ReliableOnline 001
654  * @tc.desc: Test device online reliability
655  * @tc.type: FUNC
656  * @tc.require:
657  * @tc.author: xiaozhenjian
658  */
659 HWTEST_F(DistributedDBCommunicatorDeepTest, ReliableOnline001, TestSize.Level2)
660 {
661     // Preset
662     ClearPreviousTestCaseInfluence();
663     std::atomic<int> count {0};
__anon5d3d689c0d02(const std::string &target, bool isConnect) 664     OnConnectCallback callback = [&count](const std::string &target, bool isConnect) {
665         if (isConnect) {
666             count.fetch_add(1, std::memory_order_seq_cst);
667         }
668     };
669     g_commAA->RegOnConnectCallback(callback, nullptr);
670     g_commAB->RegOnConnectCallback(callback, nullptr);
671     g_commBB->RegOnConnectCallback(callback, nullptr);
672     g_commBC->RegOnConnectCallback(callback, nullptr);
673     g_commCC->RegOnConnectCallback(callback, nullptr);
674     g_commCA->RegOnConnectCallback(callback, nullptr);
675 
676     /**
677      * @tc.steps: step1. device A and device B and device C simulate send total loss
678      */
679     g_envDeviceA.adapterHandle->SimulateSendTotalLoss();
680     g_envDeviceB.adapterHandle->SimulateSendTotalLoss();
681     g_envDeviceC.adapterHandle->SimulateSendTotalLoss();
682 
683     /**
684      * @tc.steps: step2. connect device A with device B, device B with device C, device C with device A
685      */
686     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
687     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
688     AdapterStub::ConnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
689 
690     /**
691      * @tc.steps: step3. wait a long time
692      * @tc.expected: step3. no communicator received the online callback
693      */
694     std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure quiet
695     EXPECT_EQ(count, 0); // no online callback received
696 
697     /**
698      * @tc.steps: step4. device A and device B and device C not simulate send total loss
699      */
700     g_envDeviceA.adapterHandle->SimulateSendTotalLossClear();
701     g_envDeviceB.adapterHandle->SimulateSendTotalLossClear();
702     g_envDeviceC.adapterHandle->SimulateSendTotalLossClear();
703     std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure send done
704     EXPECT_EQ(count, 6); // 6 online callback received in total
705 
706     // CleanUp
707     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
708     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
709     AdapterStub::DisconnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
710 }
711 
712 /**
713  * @tc.name: NetworkAdapter001
714  * @tc.desc: Test networkAdapter start func
715  * @tc.type: FUNC
716  * @tc.require:
717  * @tc.author: zhangqiquan
718  */
719 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter001, TestSize.Level1)
720 {
721     auto processCommunicator = std::make_shared<MockProcessCommunicator>();
722     EXPECT_CALL(*processCommunicator, Stop()).WillRepeatedly(testing::Return(OK));
723     /**
724      * @tc.steps: step1. adapter start with empty label
725      * @tc.expected: step1. start failed
726      */
727     auto adapter = std::make_shared<NetworkAdapter>("");
728     EXPECT_EQ(adapter->StartAdapter(), -E_INVALID_ARGS);
729     /**
730      * @tc.steps: step2. adapter start with not empty label but processCommunicator is null
731      * @tc.expected: step2. start failed
732      */
733     adapter = std::make_shared<NetworkAdapter>("label");
734     EXPECT_EQ(adapter->StartAdapter(), -E_INVALID_ARGS);
735     /**
736      * @tc.steps: step3. processCommunicator start not ok
737      * @tc.expected: step3. start failed
738      */
739     adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
740     EXPECT_CALL(*processCommunicator, Start).WillRepeatedly(testing::Return(DB_ERROR));
741     EXPECT_EQ(adapter->StartAdapter(), -E_PERIPHERAL_INTERFACE_FAIL);
742     /**
743      * @tc.steps: step4. processCommunicator reg not ok
744      * @tc.expected: step4. start failed
745      */
746     EXPECT_CALL(*processCommunicator, Start).WillRepeatedly(testing::Return(OK));
747     EXPECT_CALL(*processCommunicator, RegOnDataReceive).WillRepeatedly(testing::Return(DB_ERROR));
748     EXPECT_EQ(adapter->StartAdapter(), -E_PERIPHERAL_INTERFACE_FAIL);
749     EXPECT_CALL(*processCommunicator, RegOnDataReceive).WillRepeatedly(testing::Return(OK));
750     EXPECT_CALL(*processCommunicator, RegOnDeviceChange).WillRepeatedly(testing::Return(DB_ERROR));
751     EXPECT_EQ(adapter->StartAdapter(), -E_PERIPHERAL_INTERFACE_FAIL);
752     /**
753      * @tc.steps: step5. processCommunicator reg ok
754      * @tc.expected: step5. start success
755      */
756     EXPECT_CALL(*processCommunicator, RegOnDeviceChange).WillRepeatedly(testing::Return(OK));
__anon5d3d689c0e02() 757     EXPECT_CALL(*processCommunicator, GetLocalDeviceInfos).WillRepeatedly([]() {
758         DeviceInfos deviceInfos;
759         deviceInfos.identifier = "DEVICES_A"; // local is deviceA
760         return deviceInfos;
761     });
__anon5d3d689c0f02() 762     EXPECT_CALL(*processCommunicator, GetRemoteOnlineDeviceInfosList).WillRepeatedly([]() {
763         std::vector<DeviceInfos> res;
764         DeviceInfos deviceInfos;
765         deviceInfos.identifier = "DEVICES_A"; // search local is deviceA
766         res.push_back(deviceInfos);
767         deviceInfos.identifier = "DEVICES_B"; // search remote is deviceB
768         res.push_back(deviceInfos);
769         return res;
770     });
__anon5d3d689c1002(const DeviceInfos &) 771     EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
772         return false;
773     });
774     EXPECT_EQ(adapter->StartAdapter(), E_OK);
775     RuntimeContext::GetInstance()->StopTaskPool();
776 }
777 
778 /**
779  * @tc.name: NetworkAdapter002
780  * @tc.desc: Test networkAdapter get mtu func
781  * @tc.type: FUNC
782  * @tc.require:
783  * @tc.author: zhangqiquan
784  */
785 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter002, TestSize.Level1)
786 {
787     auto processCommunicator = std::make_shared<MockProcessCommunicator>();
788     auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
789     /**
790      * @tc.steps: step1. processCommunicator return 0 mtu
791      * @tc.expected: step1. adapter will adjust to min mtu
792      */
__anon5d3d689c1102() 793     EXPECT_CALL(*processCommunicator, GetMtuSize).WillRepeatedly([]() {
794         return 0u;
795     });
796     EXPECT_EQ(adapter->GetMtuSize(), DBConstant::MIN_MTU_SIZE);
797     /**
798      * @tc.steps: step2. processCommunicator return 2 max mtu
799      * @tc.expected: step2. adapter will return min mtu util re make
800      */
__anon5d3d689c1202() 801     EXPECT_CALL(*processCommunicator, GetMtuSize).WillRepeatedly([]() {
802         return 2 * DBConstant::MAX_MTU_SIZE;
803     });
804     EXPECT_EQ(adapter->GetMtuSize(), DBConstant::MIN_MTU_SIZE);
805     adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
806     EXPECT_EQ(adapter->GetMtuSize(), DBConstant::MAX_MTU_SIZE);
807 }
808 
809 /**
810  * @tc.name: NetworkAdapter003
811  * @tc.desc: Test networkAdapter get timeout func
812  * @tc.type: FUNC
813  * @tc.require:
814  * @tc.author: zhangqiquan
815  */
816 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter003, TestSize.Level1)
817 {
818     auto processCommunicator = std::make_shared<MockProcessCommunicator>();
819     auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
820     /**
821      * @tc.steps: step1. processCommunicator return 0 timeout
822      * @tc.expected: step1. adapter will adjust to min timeout
823      */
__anon5d3d689c1302() 824     EXPECT_CALL(*processCommunicator, GetTimeout).WillRepeatedly([]() {
825         return 0u;
826     });
827     EXPECT_EQ(adapter->GetTimeout(), DBConstant::MIN_TIMEOUT);
828     /**
829      * @tc.steps: step2. processCommunicator return 2 max timeout
830      * @tc.expected: step2. adapter will adjust to max timeout
831      */
__anon5d3d689c1402() 832     EXPECT_CALL(*processCommunicator, GetTimeout).WillRepeatedly([]() {
833         return 2 * DBConstant::MAX_TIMEOUT;
834     });
835     EXPECT_EQ(adapter->GetTimeout(), DBConstant::MAX_TIMEOUT);
836 }
837 
838 /**
839  * @tc.name: NetworkAdapter004
840  * @tc.desc: Test networkAdapter send bytes func
841  * @tc.type: FUNC
842  * @tc.require:
843  * @tc.author: zhangqiquan
844  */
845 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter004, TestSize.Level1)
846 {
847     auto processCommunicator = std::make_shared<MockProcessCommunicator>();
848     auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
849 
__anon5d3d689c1502(const DeviceInfos &, const uint8_t *, uint32_t) 850     EXPECT_CALL(*processCommunicator, SendData).WillRepeatedly([](const DeviceInfos &, const uint8_t *, uint32_t) {
851         return OK;
852     });
853     /**
854      * @tc.steps: step1. adapter send data with error param
855      * @tc.expected: step1. adapter send failed
856      */
857     auto data = std::make_shared<uint8_t>(1u);
858     EXPECT_EQ(adapter->SendBytes("DEVICES_B", nullptr, 1, 0), -E_INVALID_ARGS);
859     EXPECT_EQ(adapter->SendBytes("DEVICES_B", data.get(), 0, 0), -E_INVALID_ARGS);
860     /**
861      * @tc.steps: step2. adapter send data with right param
862      * @tc.expected: step2. adapter send ok
863      */
864     EXPECT_EQ(adapter->SendBytes("DEVICES_B", data.get(), 1, 0), E_OK);
865     RuntimeContext::GetInstance()->StopTaskPool();
866 }
867 
868 namespace {
InitAdapter(const std::shared_ptr<NetworkAdapter> & adapter,const std::shared_ptr<MockProcessCommunicator> & processCommunicator,OnDataReceive & onDataReceive,OnDeviceChange & onDataChange)869 void InitAdapter(const std::shared_ptr<NetworkAdapter> &adapter,
870     const std::shared_ptr<MockProcessCommunicator> &processCommunicator,
871     OnDataReceive &onDataReceive, OnDeviceChange &onDataChange)
872 {
873     EXPECT_CALL(*processCommunicator, Stop).WillRepeatedly([]() {
874         return OK;
875     });
876     EXPECT_CALL(*processCommunicator, Start).WillRepeatedly([](const std::string &) {
877         return OK;
878     });
879     EXPECT_CALL(*processCommunicator, RegOnDataReceive).WillRepeatedly(
880         [&onDataReceive](const OnDataReceive &callback) {
881             onDataReceive = callback;
882             return OK;
883     });
884     EXPECT_CALL(*processCommunicator, RegOnDeviceChange).WillRepeatedly(
885         [&onDataChange](const OnDeviceChange &callback) {
886             onDataChange = callback;
887             return OK;
888     });
889     EXPECT_CALL(*processCommunicator, GetRemoteOnlineDeviceInfosList).WillRepeatedly([]() {
890         std::vector<DeviceInfos> res;
891         return res;
892     });
893     EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
894         return false;
895     });
896     EXPECT_EQ(adapter->StartAdapter(), E_OK);
897 }
898 }
899 /**
900  * @tc.name: NetworkAdapter005
901  * @tc.desc: Test networkAdapter receive data func
902  * @tc.type: FUNC
903  * @tc.require:
904  * @tc.author: zhangqiquan
905  */
906 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter005, TestSize.Level1)
907 {
908     auto processCommunicator = std::make_shared<MockProcessCommunicator>();
909     auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
910     OnDataReceive onDataReceive;
911     OnDeviceChange onDeviceChange;
912     InitAdapter(adapter, processCommunicator, onDataReceive, onDeviceChange);
913     ASSERT_NE(onDataReceive, nullptr);
914     /**
915      * @tc.steps: step1. adapter recv data with error param
916      */
917     auto data = std::make_shared<uint8_t>(1);
918     DeviceInfos deviceInfos;
919     onDataReceive(deviceInfos, nullptr, 1);
920     onDataReceive(deviceInfos, data.get(), 0);
921     /**
922      * @tc.steps: step2. adapter recv data with no permission
923      */
__anon5d3d689c1d02(DataHeadInfo, uint32_t &) 924     EXPECT_CALL(*processCommunicator, GetDataHeadInfo).WillRepeatedly([](DataHeadInfo, uint32_t &) {
925         return NO_PERMISSION;
926     });
927     onDataReceive(deviceInfos, data.get(), 1);
__anon5d3d689c1e02(DataHeadInfo, uint32_t &) 928     EXPECT_CALL(*processCommunicator, GetDataHeadInfo).WillRepeatedly([](DataHeadInfo, uint32_t &) {
929         return OK;
930     });
931     EXPECT_CALL(*processCommunicator, GetDataUserInfo).WillRepeatedly(
__anon5d3d689c1f02(DataUserInfo, std::vector<UserInfo> &userInfos) 932         [](DataUserInfo, std::vector<UserInfo> &userInfos) {
933             UserInfo userId = {"1"};
934             userInfos.emplace_back(userId);
935             return OK;
936     });
937     /**
938      * @tc.steps: step3. adapter recv data with no callback
939      */
940     onDataReceive(deviceInfos, data.get(), 1);
__anon5d3d689c2002(const ReceiveBytesInfo &, const DataUserInfoProc &) 941     adapter->RegBytesReceiveCallback([](const ReceiveBytesInfo &, const DataUserInfoProc &) {
942     }, nullptr);
943     onDataReceive(deviceInfos, data.get(), 1);
944     RuntimeContext::GetInstance()->StopTaskPool();
945 }
946 
947 /**
948  * @tc.name: NetworkAdapter006
949  * @tc.desc: Test networkAdapter device change func
950  * @tc.type: FUNC
951  * @tc.require:
952  * @tc.author: zhangqiquan
953  */
954 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter006, TestSize.Level1)
955 {
956     auto processCommunicator = std::make_shared<MockProcessCommunicator>();
957     auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
958     OnDataReceive onDataReceive;
959     OnDeviceChange onDeviceChange;
960     InitAdapter(adapter, processCommunicator, onDataReceive, onDeviceChange);
961     ASSERT_NE(onDeviceChange, nullptr);
962     DeviceInfos deviceInfos;
963     /**
964      * @tc.steps: step1. onDeviceChange with no same process
965      */
966     onDeviceChange(deviceInfos, true);
967     /**
968      * @tc.steps: step2. onDeviceChange with same process
969      */
__anon5d3d689c2102(const DeviceInfos &) 970     EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
971         return true;
972     });
973     onDeviceChange(deviceInfos, true);
__anon5d3d689c2202(const std::string &, bool) 974     adapter->RegTargetChangeCallback([](const std::string &, bool) {
975     }, nullptr);
976     onDeviceChange(deviceInfos, false);
977     /**
978      * @tc.steps: step3. adapter send data with db_error
979      * @tc.expected: step3. adapter send failed
980      */
981     onDeviceChange(deviceInfos, true);
__anon5d3d689c2302(const DeviceInfos &, const uint8_t *, uint32_t) 982     EXPECT_CALL(*processCommunicator, SendData).WillRepeatedly([](const DeviceInfos &, const uint8_t *, uint32_t) {
983         return DB_ERROR;
984     });
__anon5d3d689c2402(const DeviceInfos &) 985     EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
986         return false;
987     });
988     auto data = std::make_shared<uint8_t>(1);
989     EXPECT_EQ(adapter->SendBytes("", data.get(), 1, 0), static_cast<int>(DB_ERROR));
990     RuntimeContext::GetInstance()->StopTaskPool();
991     EXPECT_EQ(adapter->IsDeviceOnline(""), false);
992     ExtendInfo info;
993     EXPECT_EQ(adapter->GetExtendHeaderHandle(info), nullptr);
994 }
995 
996 /**
997  * @tc.name: NetworkAdapter007
998  * @tc.desc: Test networkAdapter recv invalid head length
999  * @tc.type: FUNC
1000  * @tc.require:
1001  * @tc.author: zhangqiquan
1002  */
1003 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter007, TestSize.Level1)
1004 {
1005     auto processCommunicator = std::make_shared<MockProcessCommunicator>();
1006     auto adapter = std::make_shared<NetworkAdapter>("NetworkAdapter007", processCommunicator);
1007     OnDataReceive onDataReceive;
1008     OnDeviceChange onDeviceChange;
1009     InitAdapter(adapter, processCommunicator, onDataReceive, onDeviceChange);
1010     ASSERT_NE(onDeviceChange, nullptr);
1011     /**
1012      * @tc.steps: step1. GetDataHeadInfo return invalid headLen
1013      * @tc.expected: step1. adapter check this len
1014      */
__anon5d3d689c2502(DataHeadInfo, uint32_t &headLen) 1015     EXPECT_CALL(*processCommunicator, GetDataHeadInfo).WillOnce([](DataHeadInfo, uint32_t &headLen) {
1016         headLen = UINT32_MAX;
1017         return OK;
1018     });
1019     /**
1020      * @tc.steps: step2. Adapter ignore data because len is too large
1021      * @tc.expected: step2. BytesReceive never call
1022      */
1023     int callByteReceiveCount = 0;
1024     int res = adapter->RegBytesReceiveCallback([&callByteReceiveCount](const ReceiveBytesInfo &,
__anon5d3d689c2602(const ReceiveBytesInfo &, const DataUserInfoProc &) 1025         const DataUserInfoProc &) {
1026             LOGD("callByteReceiveCount++;");
1027         callByteReceiveCount++;
1028     }, nullptr);
1029     EXPECT_EQ(res, E_OK);
1030     std::vector<uint8_t> data = { 1u };
1031     DeviceInfos deviceInfos;
1032     onDataReceive(deviceInfos, data.data(), 1u);
1033     LOGD("callByteReceiveCount++%d;", callByteReceiveCount);
1034     EXPECT_EQ(callByteReceiveCount, 0);
1035 }
1036 
1037 /**
1038  * @tc.name: RetrySendExceededLimit001
1039  * @tc.desc: Test send result when the number of retry times exceeds the limit
1040  * @tc.type: FUNC
1041  * @tc.require:
1042  * @tc.author: suyue
1043  */
1044 HWTEST_F(DistributedDBCommunicatorDeepTest, RetrySendExceededLimit001, TestSize.Level2)
1045 {
1046     /**
1047      * @tc.steps: step1. connect device A with device B and fork SendBytes
1048      * @tc.expected: step1. operation OK
1049      */
1050     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
1051     std::atomic<int> count = 0;
__anon5d3d689c2702() 1052     g_envDeviceA.adapterHandle->ForkSendBytes([&count]() {
1053         count++;
1054         return -E_WAIT_RETRY;
1055     });
1056 
1057     /**
1058      * @tc.steps: step2. the number of retry times for device A to send a message exceeds the limit
1059      * @tc.expected: step2. sendResult fail
1060      */
1061     std::vector<std::pair<int, bool>> sendResult;
__anon5d3d689c2802(int result, bool isDirectEnd) 1062     auto sendResultNotifier = [&sendResult](int result, bool isDirectEnd) {
1063         sendResult.push_back(std::pair<int, bool>(result, isDirectEnd));
1064     };
1065     const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
1066     Message *sendMsg = BuildRegedGiantMessage(dataLength);
1067     ASSERT_NE(sendMsg, nullptr);
1068     SendConfig conf = {false, false, true, 0};
1069     int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsg, conf, sendResultNotifier);
1070     EXPECT_EQ(errCode, E_OK);
1071     std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait 1s to make sure send done
1072     g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
1073     g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B, -E_BASE);
1074     int reTryTimes = 5;
1075     while ((count < 4) && (reTryTimes > 0)) { // Wait to make sure retry exceeds the limit
1076         std::this_thread::sleep_for(std::chrono::seconds(3));
1077         reTryTimes--;
1078     }
1079     ASSERT_EQ(sendResult.size(), static_cast<size_t>(1)); // only one callback result notification
1080     EXPECT_EQ(sendResult[0].first, -E_BASE); // index 0 retry fail
1081     EXPECT_EQ(sendResult[0].second, false);
1082 
1083     g_envDeviceA.adapterHandle->ForkSendBytes(nullptr);
1084     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
1085 }
1086 
1087 /**
1088  * @tc.name: RetrySendExceededLimit002
1089  * @tc.desc: Test multi thread call SendableCallback when the number of retry times exceeds the limit
1090  * @tc.type: FUNC
1091  * @tc.require:
1092  * @tc.author: suyue
1093  */
1094 HWTEST_F(DistributedDBCommunicatorDeepTest, RetrySendExceededLimit002, TestSize.Level2)
1095 {
1096     /**
1097      * @tc.steps: step1. DeviceA send SendMessage and set SendBytes interface return -E_WAIT_RETRY
1098      * @tc.expected: step1. Send ok
1099      */
1100     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
1101     std::atomic<int> count = 0;
__anon5d3d689c2902() 1102     g_envDeviceA.adapterHandle->ForkSendBytes([&count]() {
1103         count++;
1104         return -E_WAIT_RETRY;
1105     });
1106     std::vector<std::pair<int, bool>> sendResult;
__anon5d3d689c2a02(int result, bool isDirectEnd) 1107     auto sendResultNotifier = [&sendResult](int result, bool isDirectEnd) {
1108         sendResult.push_back(std::pair<int, bool>(result, isDirectEnd));
1109     };
1110     const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
1111     Message *sendMsg = BuildRegedGiantMessage(dataLength);
1112     ASSERT_NE(sendMsg, nullptr);
1113     SendConfig conf = {false, false, true, 0};
1114     EXPECT_EQ(g_commAB->SendMessage(DEVICE_NAME_B, sendMsg, conf, sendResultNotifier), E_OK);
1115     std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait 1s to make sure send done
1116 
1117     /**
1118      * @tc.steps: step2. Triggering multi thread call SendableCallback interface and set errorCode
1119      * @tc.expected: step2. Callback success
1120      */
1121     std::vector<std::thread> threads;
1122     int threadNum = 3;
1123     threads.reserve(threadNum);
1124     for (int n = 0; n < threadNum; n++) {
__anon5d3d689c2b02() 1125         threads.emplace_back([&]() {
1126             g_envDeviceA.adapterHandle->SimulateTriggerSendableCallback(DEVICE_NAME_B, -E_BASE);
1127         });
1128     }
1129     for (std::thread &t : threads) {
1130         t.join();
1131     }
1132 
1133     /**
1134      * @tc.steps: step3. Make The number of messages sent by device A exceed the limit
1135      * @tc.expected: step3. SendResult is the errorCode set by SendableCallback interface
1136      */
1137     int reTryTimes = 5;
1138     while ((count < 4) && (reTryTimes > 0)) {
1139         std::this_thread::sleep_for(std::chrono::seconds(3));
1140         reTryTimes--;
1141     }
1142     ASSERT_EQ(sendResult.size(), static_cast<size_t>(1));
1143     EXPECT_EQ(sendResult[0].first, -E_BASE);
1144     EXPECT_EQ(sendResult[0].second, false);
1145     g_envDeviceA.adapterHandle->ForkSendBytes(nullptr);
1146     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
1147 }
1148 
1149 /**
1150  * @tc.name: AllocBufferByPayloadLengthTest001
1151  * @tc.desc: Test AllocBufferByPayloadLength func
1152  * @tc.type: FUNC
1153  * @tc.require:
1154  * @tc.author: tiansimiao
1155  */
1156 HWTEST_F(DistributedDBCommunicatorDeepTest, AllocBufferByPayloadLengthTest001, TestSize.Level2)
1157 {
1158     SerialBuffer buffer;
1159     EXPECT_EQ(buffer.AllocBufferByPayloadLength(100, 20), E_OK);
1160     EXPECT_EQ(buffer.AllocBufferByPayloadLength(100, 20), -E_NOT_PERMIT);
1161 }
1162 
1163 /**
1164  * @tc.name: AllocBufferByPayloadLengthTest002
1165  * @tc.desc: Test AllocBufferByPayloadLength func
1166  * @tc.type: FUNC
1167  * @tc.require:
1168  * @tc.author: tiansimiao
1169  */
1170 HWTEST_F(DistributedDBCommunicatorDeepTest, AllocBufferByPayloadLengthTest002, TestSize.Level2)
1171 {
1172     SerialBuffer buffer;
1173     uint32_t payloadLen = INT32_MAX - 1;
1174     uint32_t headerLen = 10;
1175     EXPECT_EQ(buffer.AllocBufferByPayloadLength(payloadLen, headerLen), -E_INVALID_ARGS);
1176 }
1177 
1178 /**
1179  * @tc.name: AllocBufferByPayloadLengthTest003
1180  * @tc.desc: Test AllocBufferByPayloadLength func
1181  * @tc.type: FUNC
1182  * @tc.require:
1183  * @tc.author: tiansimiao
1184  */
1185 HWTEST_F(DistributedDBCommunicatorDeepTest, AllocBufferByPayloadLengthTest003, TestSize.Level2)
1186 {
1187     SerialBuffer buffer;
1188     const uint8_t externalBuff[100] = {0};
1189     int ret = buffer.SetExternalBuff(externalBuff, 100, 20);
1190     EXPECT_EQ(E_OK, ret);
1191     EXPECT_EQ(buffer.AllocBufferByPayloadLength(100, 20), -E_NOT_PERMIT);
1192 }
1193 
1194 /**
1195  * @tc.name: AllocBufferByTotalLengthTest001
1196  * @tc.desc: Test AllocBufferByPayloadLength func
1197  * @tc.type: FUNC
1198  * @tc.require:
1199  * @tc.author: tiansimiao
1200  */
1201 HWTEST_F(DistributedDBCommunicatorDeepTest, AllocBufferByTotalLengthTest001, TestSize.Level2)
1202 {
1203     SerialBuffer buffer;
1204     uint32_t payloadLen = 100;
1205     uint32_t headerLen = 20;
1206     buffer.AllocBufferByPayloadLength(payloadLen, headerLen);
1207     EXPECT_EQ(buffer.AllocBufferByTotalLength(100, 20), -E_NOT_PERMIT);
1208 }
1209 
1210 /**
1211  * @tc.name: AllocBufferByTotalLengthTest002
1212  * @tc.desc: Test AllocBufferByPayloadLength func
1213  * @tc.type: FUNC
1214  * @tc.require:
1215  * @tc.author: tiansimiao
1216  */
1217 HWTEST_F(DistributedDBCommunicatorDeepTest, AllocBufferByTotalLengthTest002, TestSize.Level2)
1218 {
1219     SerialBuffer buffer;
1220     const uint8_t* externalBuff = new(std::nothrow) uint8_t[100];
1221     ASSERT_NE(externalBuff, nullptr);
1222     uint8_t tempArray[100] = {0};
1223     EXPECT_EQ(memcpy_s(const_cast<uint8_t*>(externalBuff), 100, tempArray, 100), E_OK);
1224     buffer.SetExternalBuff(externalBuff, 100, 20);
1225     EXPECT_EQ(buffer.AllocBufferByTotalLength(100, 20), -E_NOT_PERMIT);
1226     delete[] externalBuff;
1227     externalBuff = nullptr;
1228 }
1229 
1230 /**
1231  * @tc.name: AllocBufferByTotalLengthTest003
1232  * @tc.desc: Test AllocBufferByPayloadLength func
1233  * @tc.type: FUNC
1234  * @tc.require:
1235  * @tc.author: tiansimiao
1236  */
1237 HWTEST_F(DistributedDBCommunicatorDeepTest, AllocBufferByTotalLengthTest003, TestSize.Level2)
1238 {
1239     SerialBuffer buffer;
1240     EXPECT_EQ(buffer.AllocBufferByTotalLength(0, 20), -E_INVALID_ARGS);
1241     EXPECT_EQ(buffer.AllocBufferByTotalLength(MAX_TOTAL_LEN + 1, 20), -E_INVALID_ARGS);
1242     EXPECT_EQ(buffer.AllocBufferByTotalLength(5, 10), -E_INVALID_ARGS);
1243 }
1244 
1245 /**
1246  * @tc.name: SetExternalBuffTest001
1247  * @tc.desc: Test SetExternalBuff func
1248  * @tc.type: FUNC
1249  * @tc.require:
1250  * @tc.author: tiansimiao
1251  */
1252 HWTEST_F(DistributedDBCommunicatorDeepTest, SetExternalBuffTest001, TestSize.Level2)
1253 {
1254     SerialBuffer buffer;
1255     uint32_t payloadLen = 100;
1256     uint32_t headerLen = 20;
1257     buffer.AllocBufferByPayloadLength(payloadLen, headerLen);
1258     const uint8_t* externalBuff = new(std::nothrow) uint8_t[100];
1259     ASSERT_NE(externalBuff, nullptr);
1260     EXPECT_EQ(buffer.SetExternalBuff(externalBuff, 100, 20), -E_NOT_PERMIT);
1261     delete[] externalBuff;
1262     externalBuff = nullptr;
1263 }
1264 
1265 /**
1266  * @tc.name: SetExternalBuffTest002
1267  * @tc.desc: Test SetExternalBuff func
1268  * @tc.type: FUNC
1269  * @tc.require:
1270  * @tc.author: tiansimiao
1271  */
1272 HWTEST_F(DistributedDBCommunicatorDeepTest, SetExternalBuffTest002, TestSize.Level2)
1273 {
1274     SerialBuffer buffer;
1275     const uint8_t* externalBuff = new(std::nothrow) uint8_t[100];
1276     ASSERT_NE(externalBuff, nullptr);
1277     buffer.SetExternalBuff(externalBuff, 100, 20);
1278     EXPECT_EQ(buffer.SetExternalBuff(externalBuff, 100, 20), -E_NOT_PERMIT);
1279     delete[] externalBuff;
1280     externalBuff = nullptr;
1281 }
1282 
1283 /**
1284  * @tc.name: SetExternalBuffTest003
1285  * @tc.desc: Test SetExternalBuff func
1286  * @tc.type: FUNC
1287  * @tc.require:
1288  * @tc.author: tiansimiao
1289  */
1290 HWTEST_F(DistributedDBCommunicatorDeepTest, SetExternalBuffTest003, TestSize.Level2)
1291 {
1292     SerialBuffer buffer;
1293     EXPECT_EQ(buffer.SetExternalBuff(nullptr, 100, 20), -E_INVALID_ARGS);
1294     const uint8_t* externalBuff = new(std::nothrow) uint8_t[100];
1295     ASSERT_NE(externalBuff, nullptr);
1296     uint8_t tempArrayA[100] = {0};
1297     EXPECT_EQ(memcpy_s(const_cast<uint8_t*>(externalBuff), 100, tempArrayA, 100), E_OK);
1298     EXPECT_EQ(buffer.SetExternalBuff(externalBuff, 0, 20), -E_INVALID_ARGS);
1299     delete[] externalBuff;
1300     externalBuff = nullptr;
1301     externalBuff = new(std::nothrow) uint8_t[MAX_TOTAL_LEN + 1];
1302     ASSERT_NE(externalBuff, nullptr);
1303     EXPECT_EQ(buffer.SetExternalBuff(externalBuff, MAX_TOTAL_LEN + 1, 20), -E_INVALID_ARGS);
1304     delete[] externalBuff;
1305     externalBuff = nullptr;
1306     externalBuff = new(std::nothrow) uint8_t[10];
1307     ASSERT_NE(externalBuff, nullptr);
1308     uint8_t tempArrayB[10] = {0};
1309     EXPECT_EQ(memcpy_s(const_cast<uint8_t*>(externalBuff), 10, tempArrayB, 10), E_OK);
1310     EXPECT_EQ(buffer.SetExternalBuff(externalBuff, 5, 10), -E_INVALID_ARGS);
1311     delete[] externalBuff;
1312     externalBuff = nullptr;
1313 }
1314 
1315 /**
1316  * @tc.name: SerialBufferCloneTest001
1317  * @tc.desc: Test invalid args of Clone function
1318  * @tc.type: FUNC
1319  * @tc.require:
1320  * @tc.author: tiansimiao
1321  */
1322 HWTEST_F(DistributedDBCommunicatorDeepTest, SerialBufferCloneTest001, TestSize.Level2)
1323 {
1324     SerialBuffer buffer;
1325     int errorNo = 0;
1326     SerialBuffer* clone_ = buffer.Clone(errorNo);
1327     EXPECT_EQ(clone_, nullptr);
1328     EXPECT_EQ(errorNo, -E_INVALID_ARGS);
1329 }
1330 
1331 /**
1332  * @tc.name: ConvertForCrossThreadTest001
1333  * @tc.desc: Test invalid args of ConvertForCrossThread function
1334  * @tc.type: FUNC
1335  * @tc.require:
1336  * @tc.author: tiansimiao
1337  */
1338 HWTEST_F(DistributedDBCommunicatorDeepTest, ConvertForCrossThreadTest001, TestSize.Level2)
1339 {
1340     SerialBuffer buffer;
1341     EXPECT_EQ(buffer.ConvertForCrossThread(), -E_INVALID_ARGS);
1342 }
1343 
1344 /**
1345  * @tc.name: GetSizeTest001
1346  * @tc.desc: Test GetSize function
1347  * @tc.type: FUNC
1348  * @tc.require:
1349  * @tc.author: tiansimiao
1350  */
1351 HWTEST_F(DistributedDBCommunicatorDeepTest, GetSizeTest001, TestSize.Level2)
1352 {
1353     SerialBuffer buffer;
1354     EXPECT_EQ(buffer.GetSize(), 0);
1355 }
1356 
1357 /**
1358  * @tc.name: GetWritableBytesTest001
1359  * @tc.desc: Test GetWritableBytesForEntireBuffer and EntireFrame and Header and Payload function
1360  * @tc.type: FUNC
1361  * @tc.require:
1362  * @tc.author: tiansimiao
1363  */
1364 HWTEST_F(DistributedDBCommunicatorDeepTest, GetWritableBytesTest001, TestSize.Level2)
1365 {
1366     SerialBuffer buffer;
1367     EXPECT_EQ(buffer.GetWritableBytesForEntireBuffer().first, nullptr);
1368     EXPECT_EQ(buffer.GetWritableBytesForEntireBuffer().second, 0);
1369     EXPECT_EQ(buffer.GetWritableBytesForEntireFrame().first, nullptr);
1370     EXPECT_EQ(buffer.GetWritableBytesForEntireFrame().second, 0);
1371     EXPECT_EQ(buffer.GetWritableBytesForHeader().first, nullptr);
1372     EXPECT_EQ(buffer.GetWritableBytesForHeader().second, 0);
1373     EXPECT_EQ(buffer.GetWritableBytesForPayload().first, nullptr);
1374     EXPECT_EQ(buffer.GetWritableBytesForPayload().second, 0);
1375 }
1376 
1377 /**
1378  * @tc.name: GetWritableBytesTest002
1379  * @tc.desc: Test GetWritableBytesForEntireBuffer function
1380  * @tc.type: FUNC
1381  * @tc.require:
1382  * @tc.author: tiansimiao
1383  */
1384 HWTEST_F(DistributedDBCommunicatorDeepTest, GetWritableBytesTest002, TestSize.Level2)
1385 {
1386     SerialBuffer buffer;
1387     uint32_t payloadLen = 100;
1388     uint32_t headerLen = 20;
1389     EXPECT_EQ(buffer.AllocBufferByPayloadLength(payloadLen, headerLen), E_OK);
1390     EXPECT_NE(buffer.GetWritableBytesForEntireBuffer().first, nullptr);
1391     EXPECT_EQ(buffer.GetWritableBytesForEntireBuffer().second, buffer.GetSize());
1392 }
1393 
1394 /**
1395  * @tc.name: GetReadOnlyBytesTest001
1396  * @tc.desc: Test GetReadOnlyBytesForEntireBuffer and EntireFrame and Header and Payload function
1397  * @tc.type: FUNC
1398  * @tc.require:
1399  * @tc.author: tiansimiao
1400  */
1401 HWTEST_F(DistributedDBCommunicatorDeepTest, GetReadOnlyBytesTest001, TestSize.Level2)
1402 {
1403     SerialBuffer buffer;
1404     EXPECT_EQ(buffer.GetReadOnlyBytesForEntireBuffer().first, nullptr);
1405     EXPECT_EQ(buffer.GetReadOnlyBytesForEntireBuffer().second, 0);
1406     EXPECT_EQ(buffer.GetReadOnlyBytesForEntireFrame().first, nullptr);
1407     EXPECT_EQ(buffer.GetReadOnlyBytesForEntireFrame().second, 0);
1408     EXPECT_EQ(buffer.GetReadOnlyBytesForHeader().first, nullptr);
1409     EXPECT_EQ(buffer.GetReadOnlyBytesForHeader().second, 0);
1410     EXPECT_EQ(buffer.GetReadOnlyBytesForPayload().first, nullptr);
1411     EXPECT_EQ(buffer.GetReadOnlyBytesForPayload().second, 0);
1412 }
1413 
1414 /**
1415  * @tc.name: GetReadOnlyBytesTest002
1416  * @tc.desc: Test GetReadOnlyBytesForHeader function
1417  * @tc.type: FUNC
1418  * @tc.require:
1419  * @tc.author: tiansimiao
1420  */
1421 HWTEST_F(DistributedDBCommunicatorDeepTest, GetReadOnlyBytesTest002, TestSize.Level2)
1422 {
1423     SerialBuffer buffer;
1424     uint32_t payloadLen = 100;
1425     uint32_t headerLen = 20;
1426     EXPECT_EQ(buffer.AllocBufferByPayloadLength(payloadLen, headerLen), E_OK);
1427     EXPECT_NE(buffer.GetReadOnlyBytesForHeader().first, nullptr);
1428     EXPECT_NE(buffer.GetReadOnlyBytesForHeader().second, 0);
1429 }
1430 
1431 /**
1432  * @tc.name: DoOnSendEndByTaskIfNeedTest001
1433  * @tc.desc: Test DoOnSendEndByTaskIfNeed function
1434  * @tc.type: FUNC
1435  * @tc.require:
1436  * @tc.author: tiansimiao
1437  */
1438 HWTEST_F(DistributedDBCommunicatorDeepTest, DoOnSendEndByTaskIfNeedTest001, TestSize.Level1)
1439 {
1440     std::string dstTarget = DEVICE_NAME_B;
1441     FrameType inType = FrameType::APPLICATION_MESSAGE;
1442     TaskConfig config;
1443     config.nonBlock = true;
1444     config.isRetryTask = false;
1445     config.timeout = 1000;
1446     OnSendEnd onEnd = nullptr;
1447     const std::shared_ptr<DBStatusAdapter> statusAdapter = std::make_shared<DBStatusAdapter>();
1448     ASSERT_NE(statusAdapter, nullptr);
1449     auto adapterStub = std::make_shared<AdapterStub>("");
1450     IAdapter *adapterPtr = adapterStub.get();
1451     ASSERT_NE(adapterPtr, nullptr);
1452     auto aggregator = std::make_unique<CommunicatorAggregator>();
1453     ASSERT_NE(aggregator, nullptr);
1454     EXPECT_EQ(aggregator->Initialize(adapterPtr, statusAdapter), E_OK);
1455     DistributedDB::SerialBuffer *inBuff = new (std::nothrow) SerialBuffer();
1456     ASSERT_NE(inBuff, nullptr);
1457     EXPECT_EQ(aggregator->ScheduleSendTask(dstTarget, inBuff, inType, config, onEnd), E_OK);
1458     inBuff = nullptr; // inBuff was deleted in ScheduleSendTask func
1459     aggregator->Finalize();
1460 }
1461 
1462 /**
1463  * @tc.name: DoOnSendEndByTaskIfNeedTest002
1464  * @tc.desc: Test DoOnSendEndByTaskIfNeed function
1465  * @tc.type: FUNC
1466  * @tc.require:
1467  * @tc.author: tiansimiao
1468  */
1469 HWTEST_F(DistributedDBCommunicatorDeepTest, DoOnSendEndByTaskIfNeedTest002, TestSize.Level1)
1470 {
1471     std::string dstTarget = DEVICE_NAME_B;
1472     FrameType inType = FrameType::APPLICATION_MESSAGE;
1473     TaskConfig config;
1474     config.nonBlock = true;
1475     config.isRetryTask = false;
1476     config.timeout = 1000;
__anon5d3d689c2c02(int result, bool isDirectEnd) 1477     OnSendEnd onEnd = [](int result, bool isDirectEnd) {
1478         LOGD("OnSendEnd called with result: %d, isDirectEnd: %d", result, isDirectEnd);
1479     };
1480     const std::shared_ptr<DBStatusAdapter> statusAdapter = std::make_shared<DBStatusAdapter>();
1481     ASSERT_NE(statusAdapter, nullptr);
1482     auto adapterStub = std::make_shared<AdapterStub>("");
1483     IAdapter *adapterPtr = adapterStub.get();
1484     ASSERT_NE(adapterPtr, nullptr);
1485     auto aggregator = std::make_unique<CommunicatorAggregator>();
1486     ASSERT_NE(aggregator, nullptr);
1487     EXPECT_EQ(aggregator->Initialize(adapterPtr, statusAdapter), E_OK);
1488     DistributedDB::SerialBuffer *inBuff = new (std::nothrow) SerialBuffer();
1489     ASSERT_NE(inBuff, nullptr);
1490     EXPECT_EQ(aggregator->ScheduleSendTask(dstTarget, inBuff, inType, config, onEnd), E_OK);
1491     inBuff = nullptr; // inBuff was deleted in ScheduleSendTask func
1492     aggregator->Finalize();
1493 }
1494