1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <gmock/gmock.h>
18 #include <new>
19 #include <thread>
20 #include "db_errno.h"
21 #include "distributeddb_communicator_common.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "log_print.h"
24 #include "network_adapter.h"
25 #include "message.h"
26 #include "mock_process_communicator.h"
27 #include "serial_buffer.h"
28
29 using namespace std;
30 using namespace testing::ext;
31 using namespace DistributedDB;
32
33 namespace {
34 EnvHandle g_envDeviceA;
35 EnvHandle g_envDeviceB;
36 EnvHandle g_envDeviceC;
37 ICommunicator *g_commAA = nullptr;
38 ICommunicator *g_commAB = nullptr;
39 ICommunicator *g_commBB = nullptr;
40 ICommunicator *g_commBC = nullptr;
41 ICommunicator *g_commCC = nullptr;
42 ICommunicator *g_commCA = nullptr;
43 }
44
45 class DistributedDBCommunicatorDeepTest : public testing::Test {
46 public:
47 static void SetUpTestCase(void);
48 static void TearDownTestCase(void);
49 void SetUp();
50 void TearDown();
51 };
52
SetUpTestCase(void)53 void DistributedDBCommunicatorDeepTest::SetUpTestCase(void)
54 {
55 /**
56 * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
57 */
58 LOGI("[UT][DeepTest][SetUpTestCase] Enter.");
59 bool isSuccess = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
60 ASSERT_EQ(isSuccess, true);
61 isSuccess = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
62 ASSERT_EQ(isSuccess, true);
63 isSuccess = SetUpEnv(g_envDeviceC, DEVICE_NAME_C);
64 ASSERT_EQ(isSuccess, true);
65 DoRegTransformFunction();
66 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
67 }
68
TearDownTestCase(void)69 void DistributedDBCommunicatorDeepTest::TearDownTestCase(void)
70 {
71 /**
72 * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
73 */
74 LOGI("[UT][DeepTest][TearDownTestCase] Enter.");
75 std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
76 TearDownEnv(g_envDeviceA);
77 TearDownEnv(g_envDeviceB);
78 TearDownEnv(g_envDeviceC);
79 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
80 }
81
82 namespace {
AllocAllCommunicator()83 void AllocAllCommunicator()
84 {
85 int errorNo = E_OK;
86 g_commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
87 ASSERT_NOT_NULL_AND_ACTIVATE(g_commAA, "");
88 g_commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
89 ASSERT_NOT_NULL_AND_ACTIVATE(g_commAB, "");
90 g_commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
91 ASSERT_NOT_NULL_AND_ACTIVATE(g_commBB, "");
92 g_commBC = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_C, errorNo);
93 ASSERT_NOT_NULL_AND_ACTIVATE(g_commBC, "");
94 g_commCC = g_envDeviceC.commAggrHandle->AllocCommunicator(LABEL_C, errorNo);
95 ASSERT_NOT_NULL_AND_ACTIVATE(g_commCC, "");
96 g_commCA = g_envDeviceC.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
97 ASSERT_NOT_NULL_AND_ACTIVATE(g_commCA, "");
98 }
99
ReleaseAllCommunicator()100 void ReleaseAllCommunicator()
101 {
102 g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAA);
103 g_commAA = nullptr;
104 g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAB);
105 g_commAB = nullptr;
106 g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBB);
107 g_commBB = nullptr;
108 g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBC);
109 g_commBC = nullptr;
110 g_envDeviceC.commAggrHandle->ReleaseCommunicator(g_commCC);
111 g_commCC = nullptr;
112 g_envDeviceC.commAggrHandle->ReleaseCommunicator(g_commCA);
113 g_commCA = nullptr;
114 }
115 }
116
SetUp()117 void DistributedDBCommunicatorDeepTest::SetUp()
118 {
119 DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
120 /**
121 * @tc.setup: Alloc communicator AA, AB, BB, BC, CC, CA
122 */
123 AllocAllCommunicator();
124 }
125
TearDown()126 void DistributedDBCommunicatorDeepTest::TearDown()
127 {
128 /**
129 * @tc.teardown: Release communicator AA, AB, BB, BC, CC, CA
130 */
131 ReleaseAllCommunicator();
132 g_envDeviceA.commAggrHandle->ResetRetryCount();
133 g_envDeviceB.commAggrHandle->ResetRetryCount();
134 g_envDeviceC.commAggrHandle->ResetRetryCount();
135 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure all thread quiet
136 }
137
138 /**
139 * @tc.name: WaitAndRetrySend 001
140 * @tc.desc: Test send retry semantic
141 * @tc.type: FUNC
142 * @tc.require:
143 * @tc.author: xiaozhenjian
144 */
145 HWTEST_F(DistributedDBCommunicatorDeepTest, WaitAndRetrySend001, TestSize.Level2)
146 {
147 // Preset
148 Message *msgForBB = nullptr;
__anon5d3d689c0302(const std::string &srcTarget, Message *inMsg) 149 g_commBB->RegOnMessageCallback([&msgForBB](const std::string &srcTarget, Message *inMsg) {
150 msgForBB = inMsg;
151 return E_OK;
152 }, nullptr);
153 Message *msgForCA = nullptr;
__anon5d3d689c0402(const std::string &srcTarget, Message *inMsg) 154 g_commCA->RegOnMessageCallback([&msgForCA](const std::string &srcTarget, Message *inMsg) {
155 msgForCA = inMsg;
156 return E_OK;
157 }, nullptr);
158
159 /**
160 * @tc.steps: step1. connect device A with device B
161 */
162 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
163 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceC.adapterHandle);
164 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure quiet
165
166 /**
167 * @tc.steps: step2. device A simulate send retry
168 */
169 g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
170
171 /**
172 * @tc.steps: step3. device A send message to device B using communicator AB
173 * @tc.expected: step3. communicator BB received no message
174 */
175 Message *msgForAB = BuildRegedTinyMessage();
176 ASSERT_NE(msgForAB, nullptr);
177 SendConfig conf = {true, false, true, 0};
178 int errCode = g_commAB->SendMessage(DEVICE_NAME_B, msgForAB, conf);
179 EXPECT_EQ(errCode, E_OK);
180
181 Message *msgForAA = BuildRegedTinyMessage();
182 ASSERT_NE(msgForAA, nullptr);
183 errCode = g_commAA->SendMessage(DEVICE_NAME_C, msgForAA, conf);
184 EXPECT_EQ(errCode, E_OK);
185 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
186 EXPECT_EQ(msgForBB, nullptr);
187 EXPECT_NE(msgForCA, nullptr);
188 delete msgForCA;
189 msgForCA = nullptr;
190
191 /**
192 * @tc.steps: step4. device A simulate sendable feedback
193 * @tc.expected: step4. communicator BB received the message
194 */
195 g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B);
196 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
197 EXPECT_NE(msgForBB, nullptr);
198 delete msgForBB;
199 msgForBB = nullptr;
200
201 // CleanUp
202 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
203 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceC.adapterHandle);
204 }
205
206 /**
207 * @tc.name: WaitAndRetrySend002
208 * @tc.desc: Test send return retry but task not retry
209 * @tc.type: FUNC
210 * @tc.require:
211 * @tc.author: liaoyonghuang
212 */
213 HWTEST_F(DistributedDBCommunicatorDeepTest, WaitAndRetrySend002, TestSize.Level2)
214 {
215 // Preset
216 Message *msgForCA = nullptr;
__anon5d3d689c0502(const std::string &srcTarget, Message *inMsg) 217 g_commCA->RegOnMessageCallback([&msgForCA](const std::string &srcTarget, Message *inMsg) {
218 msgForCA = inMsg;
219 return E_OK;
220 }, nullptr);
221
222 /**
223 * @tc.steps: step1. connect device A with device B
224 */
225 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceC.adapterHandle);
226 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure quiet
227
228 /**
229 * @tc.steps: step2. device A simulate send retry
230 */
231 g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
232
233 /**
234 * @tc.steps: step3. device A send message to device B using communicator AB
235 * @tc.expected: step3. communicator BB received no message
236 */
237 Message *msgForAB = BuildRegedTinyMessage();
238 ASSERT_NE(msgForAB, nullptr);
239 SendConfig conf = {true, false, false, 0};
__anon5d3d689c0602(int, int) 240 OnSendEnd onSendEnd = [](int, int) {
241 LOGI("[WaitAndRetrySend002] on send end.");
242 };
243 int errCode = g_commAB->SendMessage(DEVICE_NAME_B, msgForAB, conf, onSendEnd);
244 EXPECT_EQ(errCode, E_OK);
245
246 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
247 EXPECT_EQ(msgForCA, nullptr);
248
249 /**
250 * @tc.steps: step4. device A simulate sendable feedback
251 * @tc.expected: step4. communicator BB received the message
252 */
253 g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B);
254 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
255
256 // CleanUp
257 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceC.adapterHandle);
258 }
259
CreateBufferThenAddIntoScheduler(SendTaskScheduler & scheduler,const std::string & dstTarget,Priority inPrio)260 static int CreateBufferThenAddIntoScheduler(SendTaskScheduler &scheduler, const std::string &dstTarget, Priority inPrio)
261 {
262 SerialBuffer *eachBuff = new (std::nothrow) SerialBuffer();
263 if (eachBuff == nullptr) {
264 return -E_OUT_OF_MEMORY;
265 }
266 int errCode = eachBuff->AllocBufferByTotalLength(100, 0); // 100 totallen without header
267 if (errCode != E_OK) {
268 delete eachBuff;
269 eachBuff = nullptr;
270 return errCode;
271 }
272 SendTask task{eachBuff, dstTarget, nullptr, 0u};
273 errCode = scheduler.AddSendTaskIntoSchedule(task, inPrio);
274 if (errCode != E_OK) {
275 delete eachBuff;
276 eachBuff = nullptr;
277 return errCode;
278 }
279 return E_OK;
280 }
281
282 /**
283 * @tc.name: SendSchedule 001
284 * @tc.desc: Test schedule in Priority order than in send order
285 * @tc.type: FUNC
286 * @tc.require:
287 * @tc.author: xiaozhenjian
288 */
289 HWTEST_F(DistributedDBCommunicatorDeepTest, SendSchedule001, TestSize.Level2)
290 {
291 // Preset
292 SendTaskScheduler scheduler;
293 scheduler.Initialize();
294
295 /**
296 * @tc.steps: step1. Add low priority target A buffer to schecduler
297 */
298 int errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_A, Priority::LOW);
299 EXPECT_EQ(errCode, E_OK);
300
301 /**
302 * @tc.steps: step2. Add low priority target B buffer to schecduler
303 */
304 errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_B, Priority::LOW);
305 EXPECT_EQ(errCode, E_OK);
306
307 /**
308 * @tc.steps: step3. Add normal priority target B buffer to schecduler
309 */
310 errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_B, Priority::NORMAL);
311 EXPECT_EQ(errCode, E_OK);
312
313 /**
314 * @tc.steps: step4. Add normal priority target C buffer to schecduler
315 */
316 errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_C, Priority::NORMAL);
317 EXPECT_EQ(errCode, E_OK);
318
319 /**
320 * @tc.steps: step5. Add high priority target C buffer to schecduler
321 */
322 errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_C, Priority::HIGH);
323 EXPECT_EQ(errCode, E_OK);
324
325 /**
326 * @tc.steps: step6. Add high priority target A buffer to schecduler
327 */
328 errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_A, Priority::HIGH);
329 EXPECT_EQ(errCode, E_OK);
330
331 /**
332 * @tc.steps: step7. schedule out buffers one by one
333 * @tc.expected: step7. the order is: high priority target C
334 * high priority target A
335 * normal priority target B
336 * normal priority target C
337 * low priority target A
338 * low priority target B
339 */
340 SendTask outTask;
341 SendTaskInfo outTaskInfo;
342 uint32_t totalLength = 0;
343 // high priority target C
344 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
345 ASSERT_EQ(errCode, E_OK);
346 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_C);
347 EXPECT_EQ(outTaskInfo.taskPrio, Priority::HIGH);
348 scheduler.FinalizeLastScheduleTask();
349 // high priority target A
350 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
351 ASSERT_EQ(errCode, E_OK);
352 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_A);
353 EXPECT_EQ(outTaskInfo.taskPrio, Priority::HIGH);
354 scheduler.FinalizeLastScheduleTask();
355 // normal priority target B
356 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
357 ASSERT_EQ(errCode, E_OK);
358 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_B);
359 EXPECT_EQ(outTaskInfo.taskPrio, Priority::NORMAL);
360 scheduler.FinalizeLastScheduleTask();
361 // normal priority target C
362 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
363 ASSERT_EQ(errCode, E_OK);
364 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_C);
365 EXPECT_EQ(outTaskInfo.taskPrio, Priority::NORMAL);
366 scheduler.FinalizeLastScheduleTask();
367 // low priority target A
368 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
369 ASSERT_EQ(errCode, E_OK);
370 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_A);
371 EXPECT_EQ(outTaskInfo.taskPrio, Priority::LOW);
372 scheduler.FinalizeLastScheduleTask();
373 // low priority target B
374 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
375 ASSERT_EQ(errCode, E_OK);
376 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_B);
377 EXPECT_EQ(outTaskInfo.taskPrio, Priority::LOW);
378 scheduler.FinalizeLastScheduleTask();
379 }
380
381 /**
382 * @tc.name: Fragment 001
383 * @tc.desc: Test fragmentation in send and receive
384 * @tc.type: FUNC
385 * @tc.require:
386 * @tc.author: xiaozhenjian
387 */
388 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment001, TestSize.Level2)
389 {
390 // Preset
391 Message *recvMsgForBB = nullptr;
__anon5d3d689c0702(const std::string &srcTarget, Message *inMsg) 392 g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, Message *inMsg) {
393 recvMsgForBB = inMsg;
394 return E_OK;
395 }, nullptr);
396
397 /**
398 * @tc.steps: step1. connect device A with device B
399 */
400 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
401
402 /**
403 * @tc.steps: step2. device A send message(registered and giant) to device B using communicator AB
404 * @tc.expected: step2. communicator BB received the message
405 */
406 const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
407 Message *sendMsgForAB = BuildRegedGiantMessage(dataLength);
408 ASSERT_NE(sendMsgForAB, nullptr);
409 SendConfig conf = {false, false, true, 0};
410 int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsgForAB, conf);
411 EXPECT_EQ(errCode, E_OK);
412 std::this_thread::sleep_for(std::chrono::milliseconds(2600)); // Wait 2600 ms to make sure send done
413 ASSERT_NE(recvMsgForBB, nullptr);
414 ASSERT_EQ(recvMsgForBB->GetMessageId(), REGED_GIANT_MSG_ID);
415
416 /**
417 * @tc.steps: step3. Compare received data with send data
418 * @tc.expected: step3. equal
419 */
420 Message *oriMsgForAB = BuildRegedGiantMessage(dataLength);
421 ASSERT_NE(oriMsgForAB, nullptr);
422 const RegedGiantObject *oriObjForAB = oriMsgForAB->GetObject<RegedGiantObject>();
423 ASSERT_NE(oriObjForAB, nullptr);
424 const RegedGiantObject *recvObjForBB = recvMsgForBB->GetObject<RegedGiantObject>();
425 ASSERT_NE(recvObjForBB, nullptr);
426 bool isEqual = RegedGiantObject::CheckEqual(*oriObjForAB, *recvObjForBB);
427 EXPECT_EQ(isEqual, true);
428
429 // CleanUp
430 delete oriMsgForAB;
431 oriMsgForAB = nullptr;
432 delete recvMsgForBB;
433 recvMsgForBB = nullptr;
434 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
435 }
436
437 /**
438 * @tc.name: Fragment 002
439 * @tc.desc: Test fragmentation in partial loss
440 * @tc.type: FUNC
441 * @tc.require:
442 * @tc.author: xiaozhenjian
443 */
444 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment002, TestSize.Level2)
445 {
446 // Preset
447 Message *recvMsgForCC = nullptr;
__anon5d3d689c0802(const std::string &srcTarget, Message *inMsg) 448 g_commCC->RegOnMessageCallback([&recvMsgForCC](const std::string &srcTarget, Message *inMsg) {
449 recvMsgForCC = inMsg;
450 return E_OK;
451 }, nullptr);
452
453 /**
454 * @tc.steps: step1. connect device B with device C
455 */
456 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
457 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure quiet
458
459 /**
460 * @tc.steps: step2. device B simulate partial loss
461 */
462 g_envDeviceB.adapterHandle->SimulateSendPartialLoss();
463
464 /**
465 * @tc.steps: step3. device B send message(registered and giant) to device C using communicator BC
466 * @tc.expected: step3. communicator CC not receive the message
467 */
468 uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
469 Message *sendMsgForBC = BuildRegedGiantMessage(dataLength);
470 ASSERT_NE(sendMsgForBC, nullptr);
471 SendConfig conf = {false, false, true, 0};
472 int errCode = g_commBC->SendMessage(DEVICE_NAME_C, sendMsgForBC, conf);
473 EXPECT_EQ(errCode, E_OK);
474 std::this_thread::sleep_for(std::chrono::milliseconds(2600)); // Wait 2600 ms to make sure send done
475 EXPECT_EQ(recvMsgForCC, nullptr);
476
477 /**
478 * @tc.steps: step4. device B not simulate partial loss
479 */
480 g_envDeviceB.adapterHandle->SimulateSendPartialLossClear();
481
482 /**
483 * @tc.steps: step5. device B send message(registered and giant) to device C using communicator BC
484 * @tc.expected: step5. communicator CC received the message, the length equal to the one that is second send
485 */
486 dataLength = 17 * 1024 * 1024; // 17 MB, 1024 is scale
487 Message *resendMsgForBC = BuildRegedGiantMessage(dataLength);
488 ASSERT_NE(resendMsgForBC, nullptr);
489 errCode = g_commBC->SendMessage(DEVICE_NAME_C, resendMsgForBC, conf);
490 EXPECT_EQ(errCode, E_OK);
491 std::this_thread::sleep_for(std::chrono::milliseconds(3400)); // Wait 3400 ms to make sure send done
492 ASSERT_NE(recvMsgForCC, nullptr);
493 ASSERT_EQ(recvMsgForCC->GetMessageId(), REGED_GIANT_MSG_ID);
494 const RegedGiantObject *recvObjForCC = recvMsgForCC->GetObject<RegedGiantObject>();
495 ASSERT_NE(recvObjForCC, nullptr);
496 EXPECT_EQ(dataLength, recvObjForCC->rawData_.size());
497
498 // CleanUp
499 delete recvMsgForCC;
500 recvMsgForCC = nullptr;
501 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
502 }
503
504 /**
505 * @tc.name: Fragment 003
506 * @tc.desc: Test fragmentation simultaneously
507 * @tc.type: FUNC
508 * @tc.require:
509 * @tc.author: xiaozhenjian
510 */
511 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment003, TestSize.Level3)
512 {
513 // Preset
514 std::atomic<int> count {0};
__anon5d3d689c0902(const std::string &srcTarget, Message *inMsg) 515 OnMessageCallback callback = [&count](const std::string &srcTarget, Message *inMsg) {
516 delete inMsg;
517 inMsg = nullptr;
518 count.fetch_add(1, std::memory_order_seq_cst);
519 return E_OK;
520 };
521 g_commBB->RegOnMessageCallback(callback, nullptr);
522 g_commBC->RegOnMessageCallback(callback, nullptr);
523
524 /**
525 * @tc.steps: step1. connect device A with device B, then device B with device C
526 */
527 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
528 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
529 std::this_thread::sleep_for(std::chrono::milliseconds(400)); // Wait 400 ms to make sure quiet
530
531 /**
532 * @tc.steps: step2. device A and device C simulate send block
533 */
534 g_envDeviceA.adapterHandle->SimulateSendBlock();
535 g_envDeviceC.adapterHandle->SimulateSendBlock();
536
537 /**
538 * @tc.steps: step3. device A send message(registered and giant) to device B using communicator AB
539 */
540 uint32_t dataLength = 23 * 1024 * 1024; // 23 MB, 1024 is scale
541 Message *sendMsgForAB = BuildRegedGiantMessage(dataLength);
542 ASSERT_NE(sendMsgForAB, nullptr);
543 SendConfig conf = {false, false, true, 0};
544 int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsgForAB, conf);
545 EXPECT_EQ(errCode, E_OK);
546
547 /**
548 * @tc.steps: step4. device C send message(registered and giant) to device B using communicator CC
549 */
550 Message *sendMsgForCC = BuildRegedGiantMessage(dataLength);
551 ASSERT_NE(sendMsgForCC, nullptr);
552 errCode = g_commCC->SendMessage(DEVICE_NAME_B, sendMsgForCC, conf);
553 EXPECT_EQ(errCode, E_OK);
554
555 /**
556 * @tc.steps: step5. device A and device C not simulate send block
557 * @tc.expected: step5. communicator BB and BV received the message
558 */
559 g_envDeviceA.adapterHandle->SimulateSendBlockClear();
560 g_envDeviceC.adapterHandle->SimulateSendBlockClear();
561 std::this_thread::sleep_for(std::chrono::milliseconds(9200)); // Wait 9200 ms to make sure send done
562 EXPECT_EQ(count, 2); // 2 combined message received
563
564 // CleanUp
565 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
566 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
567 }
568
569 /**
570 * @tc.name: Fragment 004
571 * @tc.desc: Test fragmentation in send and receive when rate limit
572 * @tc.type: FUNC
573 * @tc.require:
574 * @tc.author: zhangqiquan
575 */
576 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment004, TestSize.Level2)
577 {
578 /**
579 * @tc.steps: step1. connect device A with device B
580 */
581 Message *recvMsgForBB = nullptr;
__anon5d3d689c0a02(const std::string &srcTarget, Message *inMsg) 582 g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, Message *inMsg) {
583 recvMsgForBB = inMsg;
584 return E_OK;
585 }, nullptr);
586 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
587 std::atomic<int> count = 0;
__anon5d3d689c0b02() 588 g_envDeviceA.adapterHandle->ForkSendBytes([&count]() {
589 count++;
590 if (count % 3 == 0) { // retry each 3 packet
591 return -E_WAIT_RETRY;
592 }
593 return E_OK;
594 });
595 /**
596 * @tc.steps: step2. device A send message(registered and giant) to device B using communicator AB
597 * @tc.expected: step2. communicator BB received the message
598 */
599 const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
600 Message *sendMsg = BuildRegedGiantMessage(dataLength);
601 ASSERT_NE(sendMsg, nullptr);
602 SendConfig conf = {false, false, true, 0};
603 int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsg, conf);
604 EXPECT_EQ(errCode, E_OK);
605 std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait 1s to make sure send done
606 g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
607 g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B);
608 int reTryTimes = 5;
609 while (recvMsgForBB == nullptr && reTryTimes > 0) {
610 std::this_thread::sleep_for(std::chrono::seconds(3));
611 reTryTimes--;
612 }
613 ASSERT_NE(recvMsgForBB, nullptr);
614 ASSERT_EQ(recvMsgForBB->GetMessageId(), REGED_GIANT_MSG_ID);
615 /**
616 * @tc.steps: step3. Compare received data with send data
617 * @tc.expected: step3. equal
618 */
619 Message *oriMsgForAB = BuildRegedGiantMessage(dataLength);
620 ASSERT_NE(oriMsgForAB, nullptr);
621 auto *recvObjForBB = recvMsgForBB->GetObject<RegedGiantObject>();
622 ASSERT_NE(recvObjForBB, nullptr);
623 auto *oriObjForAB = oriMsgForAB->GetObject<RegedGiantObject>();
624 ASSERT_NE(oriObjForAB, nullptr);
625 bool isEqual = RegedGiantObject::CheckEqual(*oriObjForAB, *recvObjForBB);
626 EXPECT_EQ(isEqual, true);
627 g_envDeviceA.adapterHandle->ForkSendBytes(nullptr);
628
629 // CleanUp
630 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
631 delete oriMsgForAB;
632 oriMsgForAB = nullptr;
633 delete recvMsgForBB;
634 recvMsgForBB = nullptr;
635 }
636
637 namespace {
ClearPreviousTestCaseInfluence()638 void ClearPreviousTestCaseInfluence()
639 {
640 ReleaseAllCommunicator();
641 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
642 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
643 AdapterStub::ConnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
644 std::this_thread::sleep_for(std::chrono::seconds(10)); // Wait 10 s to make sure all thread quiet
645 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
646 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
647 AdapterStub::DisconnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
648 AllocAllCommunicator();
649 }
650 }
651
652 /**
653 * @tc.name: ReliableOnline 001
654 * @tc.desc: Test device online reliability
655 * @tc.type: FUNC
656 * @tc.require:
657 * @tc.author: xiaozhenjian
658 */
659 HWTEST_F(DistributedDBCommunicatorDeepTest, ReliableOnline001, TestSize.Level2)
660 {
661 // Preset
662 ClearPreviousTestCaseInfluence();
663 std::atomic<int> count {0};
__anon5d3d689c0d02(const std::string &target, bool isConnect) 664 OnConnectCallback callback = [&count](const std::string &target, bool isConnect) {
665 if (isConnect) {
666 count.fetch_add(1, std::memory_order_seq_cst);
667 }
668 };
669 g_commAA->RegOnConnectCallback(callback, nullptr);
670 g_commAB->RegOnConnectCallback(callback, nullptr);
671 g_commBB->RegOnConnectCallback(callback, nullptr);
672 g_commBC->RegOnConnectCallback(callback, nullptr);
673 g_commCC->RegOnConnectCallback(callback, nullptr);
674 g_commCA->RegOnConnectCallback(callback, nullptr);
675
676 /**
677 * @tc.steps: step1. device A and device B and device C simulate send total loss
678 */
679 g_envDeviceA.adapterHandle->SimulateSendTotalLoss();
680 g_envDeviceB.adapterHandle->SimulateSendTotalLoss();
681 g_envDeviceC.adapterHandle->SimulateSendTotalLoss();
682
683 /**
684 * @tc.steps: step2. connect device A with device B, device B with device C, device C with device A
685 */
686 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
687 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
688 AdapterStub::ConnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
689
690 /**
691 * @tc.steps: step3. wait a long time
692 * @tc.expected: step3. no communicator received the online callback
693 */
694 std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure quiet
695 EXPECT_EQ(count, 0); // no online callback received
696
697 /**
698 * @tc.steps: step4. device A and device B and device C not simulate send total loss
699 */
700 g_envDeviceA.adapterHandle->SimulateSendTotalLossClear();
701 g_envDeviceB.adapterHandle->SimulateSendTotalLossClear();
702 g_envDeviceC.adapterHandle->SimulateSendTotalLossClear();
703 std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure send done
704 EXPECT_EQ(count, 6); // 6 online callback received in total
705
706 // CleanUp
707 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
708 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
709 AdapterStub::DisconnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
710 }
711
712 /**
713 * @tc.name: NetworkAdapter001
714 * @tc.desc: Test networkAdapter start func
715 * @tc.type: FUNC
716 * @tc.require:
717 * @tc.author: zhangqiquan
718 */
719 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter001, TestSize.Level1)
720 {
721 auto processCommunicator = std::make_shared<MockProcessCommunicator>();
722 EXPECT_CALL(*processCommunicator, Stop()).WillRepeatedly(testing::Return(OK));
723 /**
724 * @tc.steps: step1. adapter start with empty label
725 * @tc.expected: step1. start failed
726 */
727 auto adapter = std::make_shared<NetworkAdapter>("");
728 EXPECT_EQ(adapter->StartAdapter(), -E_INVALID_ARGS);
729 /**
730 * @tc.steps: step2. adapter start with not empty label but processCommunicator is null
731 * @tc.expected: step2. start failed
732 */
733 adapter = std::make_shared<NetworkAdapter>("label");
734 EXPECT_EQ(adapter->StartAdapter(), -E_INVALID_ARGS);
735 /**
736 * @tc.steps: step3. processCommunicator start not ok
737 * @tc.expected: step3. start failed
738 */
739 adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
740 EXPECT_CALL(*processCommunicator, Start).WillRepeatedly(testing::Return(DB_ERROR));
741 EXPECT_EQ(adapter->StartAdapter(), -E_PERIPHERAL_INTERFACE_FAIL);
742 /**
743 * @tc.steps: step4. processCommunicator reg not ok
744 * @tc.expected: step4. start failed
745 */
746 EXPECT_CALL(*processCommunicator, Start).WillRepeatedly(testing::Return(OK));
747 EXPECT_CALL(*processCommunicator, RegOnDataReceive).WillRepeatedly(testing::Return(DB_ERROR));
748 EXPECT_EQ(adapter->StartAdapter(), -E_PERIPHERAL_INTERFACE_FAIL);
749 EXPECT_CALL(*processCommunicator, RegOnDataReceive).WillRepeatedly(testing::Return(OK));
750 EXPECT_CALL(*processCommunicator, RegOnDeviceChange).WillRepeatedly(testing::Return(DB_ERROR));
751 EXPECT_EQ(adapter->StartAdapter(), -E_PERIPHERAL_INTERFACE_FAIL);
752 /**
753 * @tc.steps: step5. processCommunicator reg ok
754 * @tc.expected: step5. start success
755 */
756 EXPECT_CALL(*processCommunicator, RegOnDeviceChange).WillRepeatedly(testing::Return(OK));
__anon5d3d689c0e02() 757 EXPECT_CALL(*processCommunicator, GetLocalDeviceInfos).WillRepeatedly([]() {
758 DeviceInfos deviceInfos;
759 deviceInfos.identifier = "DEVICES_A"; // local is deviceA
760 return deviceInfos;
761 });
__anon5d3d689c0f02() 762 EXPECT_CALL(*processCommunicator, GetRemoteOnlineDeviceInfosList).WillRepeatedly([]() {
763 std::vector<DeviceInfos> res;
764 DeviceInfos deviceInfos;
765 deviceInfos.identifier = "DEVICES_A"; // search local is deviceA
766 res.push_back(deviceInfos);
767 deviceInfos.identifier = "DEVICES_B"; // search remote is deviceB
768 res.push_back(deviceInfos);
769 return res;
770 });
__anon5d3d689c1002(const DeviceInfos &) 771 EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
772 return false;
773 });
774 EXPECT_EQ(adapter->StartAdapter(), E_OK);
775 RuntimeContext::GetInstance()->StopTaskPool();
776 }
777
778 /**
779 * @tc.name: NetworkAdapter002
780 * @tc.desc: Test networkAdapter get mtu func
781 * @tc.type: FUNC
782 * @tc.require:
783 * @tc.author: zhangqiquan
784 */
785 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter002, TestSize.Level1)
786 {
787 auto processCommunicator = std::make_shared<MockProcessCommunicator>();
788 auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
789 /**
790 * @tc.steps: step1. processCommunicator return 0 mtu
791 * @tc.expected: step1. adapter will adjust to min mtu
792 */
__anon5d3d689c1102() 793 EXPECT_CALL(*processCommunicator, GetMtuSize).WillRepeatedly([]() {
794 return 0u;
795 });
796 EXPECT_EQ(adapter->GetMtuSize(), DBConstant::MIN_MTU_SIZE);
797 /**
798 * @tc.steps: step2. processCommunicator return 2 max mtu
799 * @tc.expected: step2. adapter will return min mtu util re make
800 */
__anon5d3d689c1202() 801 EXPECT_CALL(*processCommunicator, GetMtuSize).WillRepeatedly([]() {
802 return 2 * DBConstant::MAX_MTU_SIZE;
803 });
804 EXPECT_EQ(adapter->GetMtuSize(), DBConstant::MIN_MTU_SIZE);
805 adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
806 EXPECT_EQ(adapter->GetMtuSize(), DBConstant::MAX_MTU_SIZE);
807 }
808
809 /**
810 * @tc.name: NetworkAdapter003
811 * @tc.desc: Test networkAdapter get timeout func
812 * @tc.type: FUNC
813 * @tc.require:
814 * @tc.author: zhangqiquan
815 */
816 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter003, TestSize.Level1)
817 {
818 auto processCommunicator = std::make_shared<MockProcessCommunicator>();
819 auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
820 /**
821 * @tc.steps: step1. processCommunicator return 0 timeout
822 * @tc.expected: step1. adapter will adjust to min timeout
823 */
__anon5d3d689c1302() 824 EXPECT_CALL(*processCommunicator, GetTimeout).WillRepeatedly([]() {
825 return 0u;
826 });
827 EXPECT_EQ(adapter->GetTimeout(), DBConstant::MIN_TIMEOUT);
828 /**
829 * @tc.steps: step2. processCommunicator return 2 max timeout
830 * @tc.expected: step2. adapter will adjust to max timeout
831 */
__anon5d3d689c1402() 832 EXPECT_CALL(*processCommunicator, GetTimeout).WillRepeatedly([]() {
833 return 2 * DBConstant::MAX_TIMEOUT;
834 });
835 EXPECT_EQ(adapter->GetTimeout(), DBConstant::MAX_TIMEOUT);
836 }
837
838 /**
839 * @tc.name: NetworkAdapter004
840 * @tc.desc: Test networkAdapter send bytes func
841 * @tc.type: FUNC
842 * @tc.require:
843 * @tc.author: zhangqiquan
844 */
845 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter004, TestSize.Level1)
846 {
847 auto processCommunicator = std::make_shared<MockProcessCommunicator>();
848 auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
849
__anon5d3d689c1502(const DeviceInfos &, const uint8_t *, uint32_t) 850 EXPECT_CALL(*processCommunicator, SendData).WillRepeatedly([](const DeviceInfos &, const uint8_t *, uint32_t) {
851 return OK;
852 });
853 /**
854 * @tc.steps: step1. adapter send data with error param
855 * @tc.expected: step1. adapter send failed
856 */
857 auto data = std::make_shared<uint8_t>(1u);
858 EXPECT_EQ(adapter->SendBytes("DEVICES_B", nullptr, 1, 0), -E_INVALID_ARGS);
859 EXPECT_EQ(adapter->SendBytes("DEVICES_B", data.get(), 0, 0), -E_INVALID_ARGS);
860 /**
861 * @tc.steps: step2. adapter send data with right param
862 * @tc.expected: step2. adapter send ok
863 */
864 EXPECT_EQ(adapter->SendBytes("DEVICES_B", data.get(), 1, 0), E_OK);
865 RuntimeContext::GetInstance()->StopTaskPool();
866 }
867
868 namespace {
InitAdapter(const std::shared_ptr<NetworkAdapter> & adapter,const std::shared_ptr<MockProcessCommunicator> & processCommunicator,OnDataReceive & onDataReceive,OnDeviceChange & onDataChange)869 void InitAdapter(const std::shared_ptr<NetworkAdapter> &adapter,
870 const std::shared_ptr<MockProcessCommunicator> &processCommunicator,
871 OnDataReceive &onDataReceive, OnDeviceChange &onDataChange)
872 {
873 EXPECT_CALL(*processCommunicator, Stop).WillRepeatedly([]() {
874 return OK;
875 });
876 EXPECT_CALL(*processCommunicator, Start).WillRepeatedly([](const std::string &) {
877 return OK;
878 });
879 EXPECT_CALL(*processCommunicator, RegOnDataReceive).WillRepeatedly(
880 [&onDataReceive](const OnDataReceive &callback) {
881 onDataReceive = callback;
882 return OK;
883 });
884 EXPECT_CALL(*processCommunicator, RegOnDeviceChange).WillRepeatedly(
885 [&onDataChange](const OnDeviceChange &callback) {
886 onDataChange = callback;
887 return OK;
888 });
889 EXPECT_CALL(*processCommunicator, GetRemoteOnlineDeviceInfosList).WillRepeatedly([]() {
890 std::vector<DeviceInfos> res;
891 return res;
892 });
893 EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
894 return false;
895 });
896 EXPECT_EQ(adapter->StartAdapter(), E_OK);
897 }
898 }
899 /**
900 * @tc.name: NetworkAdapter005
901 * @tc.desc: Test networkAdapter receive data func
902 * @tc.type: FUNC
903 * @tc.require:
904 * @tc.author: zhangqiquan
905 */
906 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter005, TestSize.Level1)
907 {
908 auto processCommunicator = std::make_shared<MockProcessCommunicator>();
909 auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
910 OnDataReceive onDataReceive;
911 OnDeviceChange onDeviceChange;
912 InitAdapter(adapter, processCommunicator, onDataReceive, onDeviceChange);
913 ASSERT_NE(onDataReceive, nullptr);
914 /**
915 * @tc.steps: step1. adapter recv data with error param
916 */
917 auto data = std::make_shared<uint8_t>(1);
918 DeviceInfos deviceInfos;
919 onDataReceive(deviceInfos, nullptr, 1);
920 onDataReceive(deviceInfos, data.get(), 0);
921 /**
922 * @tc.steps: step2. adapter recv data with no permission
923 */
__anon5d3d689c1d02(DataHeadInfo, uint32_t &) 924 EXPECT_CALL(*processCommunicator, GetDataHeadInfo).WillRepeatedly([](DataHeadInfo, uint32_t &) {
925 return NO_PERMISSION;
926 });
927 onDataReceive(deviceInfos, data.get(), 1);
__anon5d3d689c1e02(DataHeadInfo, uint32_t &) 928 EXPECT_CALL(*processCommunicator, GetDataHeadInfo).WillRepeatedly([](DataHeadInfo, uint32_t &) {
929 return OK;
930 });
931 EXPECT_CALL(*processCommunicator, GetDataUserInfo).WillRepeatedly(
__anon5d3d689c1f02(DataUserInfo, std::vector<UserInfo> &userInfos) 932 [](DataUserInfo, std::vector<UserInfo> &userInfos) {
933 UserInfo userId = {"1"};
934 userInfos.emplace_back(userId);
935 return OK;
936 });
937 /**
938 * @tc.steps: step3. adapter recv data with no callback
939 */
940 onDataReceive(deviceInfos, data.get(), 1);
__anon5d3d689c2002(const ReceiveBytesInfo &, const DataUserInfoProc &) 941 adapter->RegBytesReceiveCallback([](const ReceiveBytesInfo &, const DataUserInfoProc &) {
942 }, nullptr);
943 onDataReceive(deviceInfos, data.get(), 1);
944 RuntimeContext::GetInstance()->StopTaskPool();
945 }
946
947 /**
948 * @tc.name: NetworkAdapter006
949 * @tc.desc: Test networkAdapter device change func
950 * @tc.type: FUNC
951 * @tc.require:
952 * @tc.author: zhangqiquan
953 */
954 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter006, TestSize.Level1)
955 {
956 auto processCommunicator = std::make_shared<MockProcessCommunicator>();
957 auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
958 OnDataReceive onDataReceive;
959 OnDeviceChange onDeviceChange;
960 InitAdapter(adapter, processCommunicator, onDataReceive, onDeviceChange);
961 ASSERT_NE(onDeviceChange, nullptr);
962 DeviceInfos deviceInfos;
963 /**
964 * @tc.steps: step1. onDeviceChange with no same process
965 */
966 onDeviceChange(deviceInfos, true);
967 /**
968 * @tc.steps: step2. onDeviceChange with same process
969 */
__anon5d3d689c2102(const DeviceInfos &) 970 EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
971 return true;
972 });
973 onDeviceChange(deviceInfos, true);
__anon5d3d689c2202(const std::string &, bool) 974 adapter->RegTargetChangeCallback([](const std::string &, bool) {
975 }, nullptr);
976 onDeviceChange(deviceInfos, false);
977 /**
978 * @tc.steps: step3. adapter send data with db_error
979 * @tc.expected: step3. adapter send failed
980 */
981 onDeviceChange(deviceInfos, true);
__anon5d3d689c2302(const DeviceInfos &, const uint8_t *, uint32_t) 982 EXPECT_CALL(*processCommunicator, SendData).WillRepeatedly([](const DeviceInfos &, const uint8_t *, uint32_t) {
983 return DB_ERROR;
984 });
__anon5d3d689c2402(const DeviceInfos &) 985 EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
986 return false;
987 });
988 auto data = std::make_shared<uint8_t>(1);
989 EXPECT_EQ(adapter->SendBytes("", data.get(), 1, 0), static_cast<int>(DB_ERROR));
990 RuntimeContext::GetInstance()->StopTaskPool();
991 EXPECT_EQ(adapter->IsDeviceOnline(""), false);
992 ExtendInfo info;
993 EXPECT_EQ(adapter->GetExtendHeaderHandle(info), nullptr);
994 }
995
996 /**
997 * @tc.name: NetworkAdapter007
998 * @tc.desc: Test networkAdapter recv invalid head length
999 * @tc.type: FUNC
1000 * @tc.require:
1001 * @tc.author: zhangqiquan
1002 */
1003 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter007, TestSize.Level1)
1004 {
1005 auto processCommunicator = std::make_shared<MockProcessCommunicator>();
1006 auto adapter = std::make_shared<NetworkAdapter>("NetworkAdapter007", processCommunicator);
1007 OnDataReceive onDataReceive;
1008 OnDeviceChange onDeviceChange;
1009 InitAdapter(adapter, processCommunicator, onDataReceive, onDeviceChange);
1010 ASSERT_NE(onDeviceChange, nullptr);
1011 /**
1012 * @tc.steps: step1. GetDataHeadInfo return invalid headLen
1013 * @tc.expected: step1. adapter check this len
1014 */
__anon5d3d689c2502(DataHeadInfo, uint32_t &headLen) 1015 EXPECT_CALL(*processCommunicator, GetDataHeadInfo).WillOnce([](DataHeadInfo, uint32_t &headLen) {
1016 headLen = UINT32_MAX;
1017 return OK;
1018 });
1019 /**
1020 * @tc.steps: step2. Adapter ignore data because len is too large
1021 * @tc.expected: step2. BytesReceive never call
1022 */
1023 int callByteReceiveCount = 0;
1024 int res = adapter->RegBytesReceiveCallback([&callByteReceiveCount](const ReceiveBytesInfo &,
__anon5d3d689c2602(const ReceiveBytesInfo &, const DataUserInfoProc &) 1025 const DataUserInfoProc &) {
1026 LOGD("callByteReceiveCount++;");
1027 callByteReceiveCount++;
1028 }, nullptr);
1029 EXPECT_EQ(res, E_OK);
1030 std::vector<uint8_t> data = { 1u };
1031 DeviceInfos deviceInfos;
1032 onDataReceive(deviceInfos, data.data(), 1u);
1033 LOGD("callByteReceiveCount++%d;", callByteReceiveCount);
1034 EXPECT_EQ(callByteReceiveCount, 0);
1035 }
1036
1037 /**
1038 * @tc.name: RetrySendExceededLimit001
1039 * @tc.desc: Test send result when the number of retry times exceeds the limit
1040 * @tc.type: FUNC
1041 * @tc.require:
1042 * @tc.author: suyue
1043 */
1044 HWTEST_F(DistributedDBCommunicatorDeepTest, RetrySendExceededLimit001, TestSize.Level2)
1045 {
1046 /**
1047 * @tc.steps: step1. connect device A with device B and fork SendBytes
1048 * @tc.expected: step1. operation OK
1049 */
1050 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
1051 std::atomic<int> count = 0;
__anon5d3d689c2702() 1052 g_envDeviceA.adapterHandle->ForkSendBytes([&count]() {
1053 count++;
1054 return -E_WAIT_RETRY;
1055 });
1056
1057 /**
1058 * @tc.steps: step2. the number of retry times for device A to send a message exceeds the limit
1059 * @tc.expected: step2. sendResult fail
1060 */
1061 std::vector<std::pair<int, bool>> sendResult;
__anon5d3d689c2802(int result, bool isDirectEnd) 1062 auto sendResultNotifier = [&sendResult](int result, bool isDirectEnd) {
1063 sendResult.push_back(std::pair<int, bool>(result, isDirectEnd));
1064 };
1065 const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
1066 Message *sendMsg = BuildRegedGiantMessage(dataLength);
1067 ASSERT_NE(sendMsg, nullptr);
1068 SendConfig conf = {false, false, true, 0};
1069 int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsg, conf, sendResultNotifier);
1070 EXPECT_EQ(errCode, E_OK);
1071 std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait 1s to make sure send done
1072 g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
1073 g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B, -E_BASE);
1074 int reTryTimes = 5;
1075 while ((count < 4) && (reTryTimes > 0)) { // Wait to make sure retry exceeds the limit
1076 std::this_thread::sleep_for(std::chrono::seconds(3));
1077 reTryTimes--;
1078 }
1079 ASSERT_EQ(sendResult.size(), static_cast<size_t>(1)); // only one callback result notification
1080 EXPECT_EQ(sendResult[0].first, -E_BASE); // index 0 retry fail
1081 EXPECT_EQ(sendResult[0].second, false);
1082
1083 g_envDeviceA.adapterHandle->ForkSendBytes(nullptr);
1084 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
1085 }
1086
1087 /**
1088 * @tc.name: RetrySendExceededLimit002
1089 * @tc.desc: Test multi thread call SendableCallback when the number of retry times exceeds the limit
1090 * @tc.type: FUNC
1091 * @tc.require:
1092 * @tc.author: suyue
1093 */
1094 HWTEST_F(DistributedDBCommunicatorDeepTest, RetrySendExceededLimit002, TestSize.Level2)
1095 {
1096 /**
1097 * @tc.steps: step1. DeviceA send SendMessage and set SendBytes interface return -E_WAIT_RETRY
1098 * @tc.expected: step1. Send ok
1099 */
1100 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
1101 std::atomic<int> count = 0;
__anon5d3d689c2902() 1102 g_envDeviceA.adapterHandle->ForkSendBytes([&count]() {
1103 count++;
1104 return -E_WAIT_RETRY;
1105 });
1106 std::vector<std::pair<int, bool>> sendResult;
__anon5d3d689c2a02(int result, bool isDirectEnd) 1107 auto sendResultNotifier = [&sendResult](int result, bool isDirectEnd) {
1108 sendResult.push_back(std::pair<int, bool>(result, isDirectEnd));
1109 };
1110 const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
1111 Message *sendMsg = BuildRegedGiantMessage(dataLength);
1112 ASSERT_NE(sendMsg, nullptr);
1113 SendConfig conf = {false, false, true, 0};
1114 EXPECT_EQ(g_commAB->SendMessage(DEVICE_NAME_B, sendMsg, conf, sendResultNotifier), E_OK);
1115 std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait 1s to make sure send done
1116
1117 /**
1118 * @tc.steps: step2. Triggering multi thread call SendableCallback interface and set errorCode
1119 * @tc.expected: step2. Callback success
1120 */
1121 std::vector<std::thread> threads;
1122 int threadNum = 3;
1123 threads.reserve(threadNum);
1124 for (int n = 0; n < threadNum; n++) {
__anon5d3d689c2b02() 1125 threads.emplace_back([&]() {
1126 g_envDeviceA.adapterHandle->SimulateTriggerSendableCallback(DEVICE_NAME_B, -E_BASE);
1127 });
1128 }
1129 for (std::thread &t : threads) {
1130 t.join();
1131 }
1132
1133 /**
1134 * @tc.steps: step3. Make The number of messages sent by device A exceed the limit
1135 * @tc.expected: step3. SendResult is the errorCode set by SendableCallback interface
1136 */
1137 int reTryTimes = 5;
1138 while ((count < 4) && (reTryTimes > 0)) {
1139 std::this_thread::sleep_for(std::chrono::seconds(3));
1140 reTryTimes--;
1141 }
1142 ASSERT_EQ(sendResult.size(), static_cast<size_t>(1));
1143 EXPECT_EQ(sendResult[0].first, -E_BASE);
1144 EXPECT_EQ(sendResult[0].second, false);
1145 g_envDeviceA.adapterHandle->ForkSendBytes(nullptr);
1146 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
1147 }
1148
1149 /**
1150 * @tc.name: AllocBufferByPayloadLengthTest001
1151 * @tc.desc: Test AllocBufferByPayloadLength func
1152 * @tc.type: FUNC
1153 * @tc.require:
1154 * @tc.author: tiansimiao
1155 */
1156 HWTEST_F(DistributedDBCommunicatorDeepTest, AllocBufferByPayloadLengthTest001, TestSize.Level2)
1157 {
1158 SerialBuffer buffer;
1159 EXPECT_EQ(buffer.AllocBufferByPayloadLength(100, 20), E_OK);
1160 EXPECT_EQ(buffer.AllocBufferByPayloadLength(100, 20), -E_NOT_PERMIT);
1161 }
1162
1163 /**
1164 * @tc.name: AllocBufferByPayloadLengthTest002
1165 * @tc.desc: Test AllocBufferByPayloadLength func
1166 * @tc.type: FUNC
1167 * @tc.require:
1168 * @tc.author: tiansimiao
1169 */
1170 HWTEST_F(DistributedDBCommunicatorDeepTest, AllocBufferByPayloadLengthTest002, TestSize.Level2)
1171 {
1172 SerialBuffer buffer;
1173 uint32_t payloadLen = INT32_MAX - 1;
1174 uint32_t headerLen = 10;
1175 EXPECT_EQ(buffer.AllocBufferByPayloadLength(payloadLen, headerLen), -E_INVALID_ARGS);
1176 }
1177
1178 /**
1179 * @tc.name: AllocBufferByPayloadLengthTest003
1180 * @tc.desc: Test AllocBufferByPayloadLength func
1181 * @tc.type: FUNC
1182 * @tc.require:
1183 * @tc.author: tiansimiao
1184 */
1185 HWTEST_F(DistributedDBCommunicatorDeepTest, AllocBufferByPayloadLengthTest003, TestSize.Level2)
1186 {
1187 SerialBuffer buffer;
1188 const uint8_t externalBuff[100] = {0};
1189 int ret = buffer.SetExternalBuff(externalBuff, 100, 20);
1190 EXPECT_EQ(E_OK, ret);
1191 EXPECT_EQ(buffer.AllocBufferByPayloadLength(100, 20), -E_NOT_PERMIT);
1192 }
1193
1194 /**
1195 * @tc.name: AllocBufferByTotalLengthTest001
1196 * @tc.desc: Test AllocBufferByPayloadLength func
1197 * @tc.type: FUNC
1198 * @tc.require:
1199 * @tc.author: tiansimiao
1200 */
1201 HWTEST_F(DistributedDBCommunicatorDeepTest, AllocBufferByTotalLengthTest001, TestSize.Level2)
1202 {
1203 SerialBuffer buffer;
1204 uint32_t payloadLen = 100;
1205 uint32_t headerLen = 20;
1206 buffer.AllocBufferByPayloadLength(payloadLen, headerLen);
1207 EXPECT_EQ(buffer.AllocBufferByTotalLength(100, 20), -E_NOT_PERMIT);
1208 }
1209
1210 /**
1211 * @tc.name: AllocBufferByTotalLengthTest002
1212 * @tc.desc: Test AllocBufferByPayloadLength func
1213 * @tc.type: FUNC
1214 * @tc.require:
1215 * @tc.author: tiansimiao
1216 */
1217 HWTEST_F(DistributedDBCommunicatorDeepTest, AllocBufferByTotalLengthTest002, TestSize.Level2)
1218 {
1219 SerialBuffer buffer;
1220 const uint8_t* externalBuff = new(std::nothrow) uint8_t[100];
1221 ASSERT_NE(externalBuff, nullptr);
1222 uint8_t tempArray[100] = {0};
1223 EXPECT_EQ(memcpy_s(const_cast<uint8_t*>(externalBuff), 100, tempArray, 100), E_OK);
1224 buffer.SetExternalBuff(externalBuff, 100, 20);
1225 EXPECT_EQ(buffer.AllocBufferByTotalLength(100, 20), -E_NOT_PERMIT);
1226 delete[] externalBuff;
1227 externalBuff = nullptr;
1228 }
1229
1230 /**
1231 * @tc.name: AllocBufferByTotalLengthTest003
1232 * @tc.desc: Test AllocBufferByPayloadLength func
1233 * @tc.type: FUNC
1234 * @tc.require:
1235 * @tc.author: tiansimiao
1236 */
1237 HWTEST_F(DistributedDBCommunicatorDeepTest, AllocBufferByTotalLengthTest003, TestSize.Level2)
1238 {
1239 SerialBuffer buffer;
1240 EXPECT_EQ(buffer.AllocBufferByTotalLength(0, 20), -E_INVALID_ARGS);
1241 EXPECT_EQ(buffer.AllocBufferByTotalLength(MAX_TOTAL_LEN + 1, 20), -E_INVALID_ARGS);
1242 EXPECT_EQ(buffer.AllocBufferByTotalLength(5, 10), -E_INVALID_ARGS);
1243 }
1244
1245 /**
1246 * @tc.name: SetExternalBuffTest001
1247 * @tc.desc: Test SetExternalBuff func
1248 * @tc.type: FUNC
1249 * @tc.require:
1250 * @tc.author: tiansimiao
1251 */
1252 HWTEST_F(DistributedDBCommunicatorDeepTest, SetExternalBuffTest001, TestSize.Level2)
1253 {
1254 SerialBuffer buffer;
1255 uint32_t payloadLen = 100;
1256 uint32_t headerLen = 20;
1257 buffer.AllocBufferByPayloadLength(payloadLen, headerLen);
1258 const uint8_t* externalBuff = new(std::nothrow) uint8_t[100];
1259 ASSERT_NE(externalBuff, nullptr);
1260 EXPECT_EQ(buffer.SetExternalBuff(externalBuff, 100, 20), -E_NOT_PERMIT);
1261 delete[] externalBuff;
1262 externalBuff = nullptr;
1263 }
1264
1265 /**
1266 * @tc.name: SetExternalBuffTest002
1267 * @tc.desc: Test SetExternalBuff func
1268 * @tc.type: FUNC
1269 * @tc.require:
1270 * @tc.author: tiansimiao
1271 */
1272 HWTEST_F(DistributedDBCommunicatorDeepTest, SetExternalBuffTest002, TestSize.Level2)
1273 {
1274 SerialBuffer buffer;
1275 const uint8_t* externalBuff = new(std::nothrow) uint8_t[100];
1276 ASSERT_NE(externalBuff, nullptr);
1277 buffer.SetExternalBuff(externalBuff, 100, 20);
1278 EXPECT_EQ(buffer.SetExternalBuff(externalBuff, 100, 20), -E_NOT_PERMIT);
1279 delete[] externalBuff;
1280 externalBuff = nullptr;
1281 }
1282
1283 /**
1284 * @tc.name: SetExternalBuffTest003
1285 * @tc.desc: Test SetExternalBuff func
1286 * @tc.type: FUNC
1287 * @tc.require:
1288 * @tc.author: tiansimiao
1289 */
1290 HWTEST_F(DistributedDBCommunicatorDeepTest, SetExternalBuffTest003, TestSize.Level2)
1291 {
1292 SerialBuffer buffer;
1293 EXPECT_EQ(buffer.SetExternalBuff(nullptr, 100, 20), -E_INVALID_ARGS);
1294 const uint8_t* externalBuff = new(std::nothrow) uint8_t[100];
1295 ASSERT_NE(externalBuff, nullptr);
1296 uint8_t tempArrayA[100] = {0};
1297 EXPECT_EQ(memcpy_s(const_cast<uint8_t*>(externalBuff), 100, tempArrayA, 100), E_OK);
1298 EXPECT_EQ(buffer.SetExternalBuff(externalBuff, 0, 20), -E_INVALID_ARGS);
1299 delete[] externalBuff;
1300 externalBuff = nullptr;
1301 externalBuff = new(std::nothrow) uint8_t[MAX_TOTAL_LEN + 1];
1302 ASSERT_NE(externalBuff, nullptr);
1303 EXPECT_EQ(buffer.SetExternalBuff(externalBuff, MAX_TOTAL_LEN + 1, 20), -E_INVALID_ARGS);
1304 delete[] externalBuff;
1305 externalBuff = nullptr;
1306 externalBuff = new(std::nothrow) uint8_t[10];
1307 ASSERT_NE(externalBuff, nullptr);
1308 uint8_t tempArrayB[10] = {0};
1309 EXPECT_EQ(memcpy_s(const_cast<uint8_t*>(externalBuff), 10, tempArrayB, 10), E_OK);
1310 EXPECT_EQ(buffer.SetExternalBuff(externalBuff, 5, 10), -E_INVALID_ARGS);
1311 delete[] externalBuff;
1312 externalBuff = nullptr;
1313 }
1314
1315 /**
1316 * @tc.name: SerialBufferCloneTest001
1317 * @tc.desc: Test invalid args of Clone function
1318 * @tc.type: FUNC
1319 * @tc.require:
1320 * @tc.author: tiansimiao
1321 */
1322 HWTEST_F(DistributedDBCommunicatorDeepTest, SerialBufferCloneTest001, TestSize.Level2)
1323 {
1324 SerialBuffer buffer;
1325 int errorNo = 0;
1326 SerialBuffer* clone_ = buffer.Clone(errorNo);
1327 EXPECT_EQ(clone_, nullptr);
1328 EXPECT_EQ(errorNo, -E_INVALID_ARGS);
1329 }
1330
1331 /**
1332 * @tc.name: ConvertForCrossThreadTest001
1333 * @tc.desc: Test invalid args of ConvertForCrossThread function
1334 * @tc.type: FUNC
1335 * @tc.require:
1336 * @tc.author: tiansimiao
1337 */
1338 HWTEST_F(DistributedDBCommunicatorDeepTest, ConvertForCrossThreadTest001, TestSize.Level2)
1339 {
1340 SerialBuffer buffer;
1341 EXPECT_EQ(buffer.ConvertForCrossThread(), -E_INVALID_ARGS);
1342 }
1343
1344 /**
1345 * @tc.name: GetSizeTest001
1346 * @tc.desc: Test GetSize function
1347 * @tc.type: FUNC
1348 * @tc.require:
1349 * @tc.author: tiansimiao
1350 */
1351 HWTEST_F(DistributedDBCommunicatorDeepTest, GetSizeTest001, TestSize.Level2)
1352 {
1353 SerialBuffer buffer;
1354 EXPECT_EQ(buffer.GetSize(), 0);
1355 }
1356
1357 /**
1358 * @tc.name: GetWritableBytesTest001
1359 * @tc.desc: Test GetWritableBytesForEntireBuffer and EntireFrame and Header and Payload function
1360 * @tc.type: FUNC
1361 * @tc.require:
1362 * @tc.author: tiansimiao
1363 */
1364 HWTEST_F(DistributedDBCommunicatorDeepTest, GetWritableBytesTest001, TestSize.Level2)
1365 {
1366 SerialBuffer buffer;
1367 EXPECT_EQ(buffer.GetWritableBytesForEntireBuffer().first, nullptr);
1368 EXPECT_EQ(buffer.GetWritableBytesForEntireBuffer().second, 0);
1369 EXPECT_EQ(buffer.GetWritableBytesForEntireFrame().first, nullptr);
1370 EXPECT_EQ(buffer.GetWritableBytesForEntireFrame().second, 0);
1371 EXPECT_EQ(buffer.GetWritableBytesForHeader().first, nullptr);
1372 EXPECT_EQ(buffer.GetWritableBytesForHeader().second, 0);
1373 EXPECT_EQ(buffer.GetWritableBytesForPayload().first, nullptr);
1374 EXPECT_EQ(buffer.GetWritableBytesForPayload().second, 0);
1375 }
1376
1377 /**
1378 * @tc.name: GetWritableBytesTest002
1379 * @tc.desc: Test GetWritableBytesForEntireBuffer function
1380 * @tc.type: FUNC
1381 * @tc.require:
1382 * @tc.author: tiansimiao
1383 */
1384 HWTEST_F(DistributedDBCommunicatorDeepTest, GetWritableBytesTest002, TestSize.Level2)
1385 {
1386 SerialBuffer buffer;
1387 uint32_t payloadLen = 100;
1388 uint32_t headerLen = 20;
1389 EXPECT_EQ(buffer.AllocBufferByPayloadLength(payloadLen, headerLen), E_OK);
1390 EXPECT_NE(buffer.GetWritableBytesForEntireBuffer().first, nullptr);
1391 EXPECT_EQ(buffer.GetWritableBytesForEntireBuffer().second, buffer.GetSize());
1392 }
1393
1394 /**
1395 * @tc.name: GetReadOnlyBytesTest001
1396 * @tc.desc: Test GetReadOnlyBytesForEntireBuffer and EntireFrame and Header and Payload function
1397 * @tc.type: FUNC
1398 * @tc.require:
1399 * @tc.author: tiansimiao
1400 */
1401 HWTEST_F(DistributedDBCommunicatorDeepTest, GetReadOnlyBytesTest001, TestSize.Level2)
1402 {
1403 SerialBuffer buffer;
1404 EXPECT_EQ(buffer.GetReadOnlyBytesForEntireBuffer().first, nullptr);
1405 EXPECT_EQ(buffer.GetReadOnlyBytesForEntireBuffer().second, 0);
1406 EXPECT_EQ(buffer.GetReadOnlyBytesForEntireFrame().first, nullptr);
1407 EXPECT_EQ(buffer.GetReadOnlyBytesForEntireFrame().second, 0);
1408 EXPECT_EQ(buffer.GetReadOnlyBytesForHeader().first, nullptr);
1409 EXPECT_EQ(buffer.GetReadOnlyBytesForHeader().second, 0);
1410 EXPECT_EQ(buffer.GetReadOnlyBytesForPayload().first, nullptr);
1411 EXPECT_EQ(buffer.GetReadOnlyBytesForPayload().second, 0);
1412 }
1413
1414 /**
1415 * @tc.name: GetReadOnlyBytesTest002
1416 * @tc.desc: Test GetReadOnlyBytesForHeader function
1417 * @tc.type: FUNC
1418 * @tc.require:
1419 * @tc.author: tiansimiao
1420 */
1421 HWTEST_F(DistributedDBCommunicatorDeepTest, GetReadOnlyBytesTest002, TestSize.Level2)
1422 {
1423 SerialBuffer buffer;
1424 uint32_t payloadLen = 100;
1425 uint32_t headerLen = 20;
1426 EXPECT_EQ(buffer.AllocBufferByPayloadLength(payloadLen, headerLen), E_OK);
1427 EXPECT_NE(buffer.GetReadOnlyBytesForHeader().first, nullptr);
1428 EXPECT_NE(buffer.GetReadOnlyBytesForHeader().second, 0);
1429 }
1430
1431 /**
1432 * @tc.name: DoOnSendEndByTaskIfNeedTest001
1433 * @tc.desc: Test DoOnSendEndByTaskIfNeed function
1434 * @tc.type: FUNC
1435 * @tc.require:
1436 * @tc.author: tiansimiao
1437 */
1438 HWTEST_F(DistributedDBCommunicatorDeepTest, DoOnSendEndByTaskIfNeedTest001, TestSize.Level1)
1439 {
1440 std::string dstTarget = DEVICE_NAME_B;
1441 FrameType inType = FrameType::APPLICATION_MESSAGE;
1442 TaskConfig config;
1443 config.nonBlock = true;
1444 config.isRetryTask = false;
1445 config.timeout = 1000;
1446 OnSendEnd onEnd = nullptr;
1447 const std::shared_ptr<DBStatusAdapter> statusAdapter = std::make_shared<DBStatusAdapter>();
1448 ASSERT_NE(statusAdapter, nullptr);
1449 auto adapterStub = std::make_shared<AdapterStub>("");
1450 IAdapter *adapterPtr = adapterStub.get();
1451 ASSERT_NE(adapterPtr, nullptr);
1452 auto aggregator = std::make_unique<CommunicatorAggregator>();
1453 ASSERT_NE(aggregator, nullptr);
1454 EXPECT_EQ(aggregator->Initialize(adapterPtr, statusAdapter), E_OK);
1455 DistributedDB::SerialBuffer *inBuff = new (std::nothrow) SerialBuffer();
1456 ASSERT_NE(inBuff, nullptr);
1457 EXPECT_EQ(aggregator->ScheduleSendTask(dstTarget, inBuff, inType, config, onEnd), E_OK);
1458 inBuff = nullptr; // inBuff was deleted in ScheduleSendTask func
1459 aggregator->Finalize();
1460 }
1461
1462 /**
1463 * @tc.name: DoOnSendEndByTaskIfNeedTest002
1464 * @tc.desc: Test DoOnSendEndByTaskIfNeed function
1465 * @tc.type: FUNC
1466 * @tc.require:
1467 * @tc.author: tiansimiao
1468 */
1469 HWTEST_F(DistributedDBCommunicatorDeepTest, DoOnSendEndByTaskIfNeedTest002, TestSize.Level1)
1470 {
1471 std::string dstTarget = DEVICE_NAME_B;
1472 FrameType inType = FrameType::APPLICATION_MESSAGE;
1473 TaskConfig config;
1474 config.nonBlock = true;
1475 config.isRetryTask = false;
1476 config.timeout = 1000;
__anon5d3d689c2c02(int result, bool isDirectEnd) 1477 OnSendEnd onEnd = [](int result, bool isDirectEnd) {
1478 LOGD("OnSendEnd called with result: %d, isDirectEnd: %d", result, isDirectEnd);
1479 };
1480 const std::shared_ptr<DBStatusAdapter> statusAdapter = std::make_shared<DBStatusAdapter>();
1481 ASSERT_NE(statusAdapter, nullptr);
1482 auto adapterStub = std::make_shared<AdapterStub>("");
1483 IAdapter *adapterPtr = adapterStub.get();
1484 ASSERT_NE(adapterPtr, nullptr);
1485 auto aggregator = std::make_unique<CommunicatorAggregator>();
1486 ASSERT_NE(aggregator, nullptr);
1487 EXPECT_EQ(aggregator->Initialize(adapterPtr, statusAdapter), E_OK);
1488 DistributedDB::SerialBuffer *inBuff = new (std::nothrow) SerialBuffer();
1489 ASSERT_NE(inBuff, nullptr);
1490 EXPECT_EQ(aggregator->ScheduleSendTask(dstTarget, inBuff, inType, config, onEnd), E_OK);
1491 inBuff = nullptr; // inBuff was deleted in ScheduleSendTask func
1492 aggregator->Finalize();
1493 }
1494