1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <thread>
18 #include "db_errno.h"
19 #include "distributeddb_communicator_common.h"
20 #include "distributeddb_tools_unit_test.h"
21 #include "log_print.h"
22 #include "message.h"
23 #include "protocol_proto.h"
24 #include "time_sync.h"
25 #include "sync_types.h"
26
27 using namespace std;
28 using namespace testing::ext;
29 using namespace DistributedDB;
30
31 namespace {
32 constexpr int SEND_COUNT_GOAL = 20; // Send 20 times
33
34 EnvHandle g_envDeviceA;
35 EnvHandle g_envDeviceB;
36 ICommunicator *g_commAA = nullptr;
37 ICommunicator *g_commBA = nullptr;
38 ICommunicator *g_commBB = nullptr;
39 }
40
41 class DistributedDBCommunicatorSendReceiveTest : public testing::Test {
42 public:
43 static void SetUpTestCase(void);
44 static void TearDownTestCase(void);
45 void SetUp();
46 void TearDown();
47 };
48
SetUpTestCase(void)49 void DistributedDBCommunicatorSendReceiveTest::SetUpTestCase(void)
50 {
51 /**
52 * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
53 */
54 LOGI("[UT][SendRecvTest][SetUpTestCase] Enter.");
55 bool errCode = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
56 ASSERT_EQ(errCode, true);
57 errCode = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
58 ASSERT_EQ(errCode, true);
59 DoRegTransformFunction();
60 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
61 }
62
TearDownTestCase(void)63 void DistributedDBCommunicatorSendReceiveTest::TearDownTestCase(void)
64 {
65 /**
66 * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
67 */
68 LOGI("[UT][SendRecvTest][TearDownTestCase] Enter.");
69 std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
70 TearDownEnv(g_envDeviceA);
71 TearDownEnv(g_envDeviceB);
72 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
73 }
74
GetCommunicator(uint64_t label,const std::string & userId,EnvHandle & device,ICommunicator ** comm)75 static void GetCommunicator(uint64_t label, const std::string &userId, EnvHandle &device, ICommunicator **comm)
76 {
77 int errorNo = E_OK;
78 *comm = device.commAggrHandle->AllocCommunicator(label, errorNo, userId);
79 ASSERT_EQ(errorNo, E_OK);
80 ASSERT_NOT_NULL_AND_ACTIVATE(*comm, userId);
81 }
82
SetUp()83 void DistributedDBCommunicatorSendReceiveTest::SetUp()
84 {
85 DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
86 /**
87 * @tc.setup: Alloc communicator AA, BA, BB
88 */
89 GetCommunicator(LABEL_A, "", g_envDeviceA, &g_commAA);
90 GetCommunicator(LABEL_A, "", g_envDeviceB, &g_commBA);
91 GetCommunicator(LABEL_B, "", g_envDeviceB, &g_commBB);
92 }
93
TearDown()94 void DistributedDBCommunicatorSendReceiveTest::TearDown()
95 {
96 /**
97 * @tc.teardown: Release communicator AA, BA, BB
98 */
99 g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAA);
100 g_commAA = nullptr;
101 g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBA);
102 g_commBA = nullptr;
103 g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBB);
104 g_commBA = nullptr;
105 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure all thread quiet
106 }
107
BuildAppLayerFrameMessage()108 static Message *BuildAppLayerFrameMessage()
109 {
110 DistributedDBUnitTest::DataSyncMessageInfo info;
111 info.messageId_ = DistributedDB::TIME_SYNC_MESSAGE;
112 info.messageType_ = TYPE_REQUEST;
113 DistributedDB::Message *message = nullptr;
114 DistributedDBUnitTest::DistributedDBToolsUnitTest::BuildMessage(info, message);
115 return message;
116 }
117
CheckRecvMessage(Message * recvMsg,bool isEmpty,uint32_t msgId,uint32_t msgType)118 static void CheckRecvMessage(Message *recvMsg, bool isEmpty, uint32_t msgId, uint32_t msgType)
119 {
120 if (isEmpty) {
121 EXPECT_EQ(recvMsg, nullptr);
122 } else {
123 ASSERT_NE(recvMsg, nullptr);
124 EXPECT_EQ(recvMsg->GetMessageId(), msgId);
125 EXPECT_EQ(recvMsg->GetMessageType(), msgType);
126 EXPECT_EQ(recvMsg->GetSessionId(), FIXED_SESSIONID);
127 EXPECT_EQ(recvMsg->GetSequenceId(), FIXED_SEQUENCEID);
128 EXPECT_EQ(recvMsg->GetErrorNo(), NO_ERROR);
129 delete recvMsg;
130 recvMsg = nullptr;
131 }
132 }
133
134 #define REG_MESSAGE_CALLBACK(src, label) \
135 string srcTargetFor##src##label; \
136 Message *recvMsgFor##src##label = nullptr; \
137 g_comm##src##label->RegOnMessageCallback( \
138 [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, Message *inMsg) { \
139 srcTargetFor##src##label = srcTarget; \
140 recvMsgFor##src##label = inMsg; \
141 return E_OK; \
142 }, nullptr);
143
144 /**
145 * @tc.name: Send And Receive 001
146 * @tc.desc: Test send and receive based on equipment communicator
147 * @tc.type: FUNC
148 * @tc.require:
149 * @tc.author: xiaozhenjian
150 */
151 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceive001, TestSize.Level1)
152 {
153 // Preset
154 REG_MESSAGE_CALLBACK(A, A);
155 REG_MESSAGE_CALLBACK(B, A);
156 REG_MESSAGE_CALLBACK(B, B);
157
158 /**
159 * @tc.steps: step1. connect device A with device B
160 */
161 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
162
163 /**
164 * @tc.steps: step2. device A send message(registered and tiny) to device B using communicator AA
165 * @tc.expected: step2. communicator BA received the message
166 */
167 Message *msgForAA = BuildRegedTinyMessage();
168 ASSERT_NE(msgForAA, nullptr);
169 SendConfig conf = {false, false, true, 0, {}};
170 int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
171 EXPECT_EQ(errCode, E_OK);
172 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // sleep 200 ms
173 CheckRecvMessage(recvMsgForBB, true, 0, 0);
174 EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
175 CheckRecvMessage(recvMsgForBA, false, REGED_TINY_MSG_ID, TYPE_REQUEST);
176
177 /**
178 * @tc.steps: step3. device B send message(registered and tiny) to device A using communicator BB
179 * @tc.expected: step3. communicator AA did not receive the message
180 */
181 Message *msgForBB = BuildRegedTinyMessage();
182 ASSERT_NE(msgForBB, nullptr);
183 conf = {true, 0};
184 errCode = g_commBB->SendMessage(DEVICE_NAME_A, msgForBB, conf);
185 EXPECT_EQ(errCode, E_OK);
186 std::this_thread::sleep_for(std::chrono::milliseconds(100));
187 EXPECT_EQ(srcTargetForAA, "");
188
189 // CleanUp
190 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
191 }
192
193 /**
194 * @tc.name: Send And Receive 002
195 * @tc.desc: Test send oversize message will fail
196 * @tc.type: FUNC
197 * @tc.require:
198 * @tc.author: xiaozhenjian
199 */
200 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceive002, TestSize.Level1)
201 {
202 /**
203 * @tc.steps: step1. connect device A with device B
204 */
205 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
206
207 /**
208 * @tc.steps: step2. device A send message(registered and oversize) to device B using communicator AA
209 * @tc.expected: step2. send fail
210 */
211 Message *msgForAA = BuildRegedOverSizeMessage();
212 ASSERT_NE(msgForAA, nullptr);
213 SendConfig conf = {true, false, true, 0};
214 int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
215 EXPECT_NE(errCode, E_OK);
216 delete msgForAA;
217 msgForAA = nullptr;
218
219 // CleanUp
220 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
221 }
222
223 /**
224 * @tc.name: Send And Receive 003
225 * @tc.desc: Test send unregistered message will fail
226 * @tc.type: FUNC
227 * @tc.require:
228 * @tc.author: xiaozhenjian
229 */
230 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceive003, TestSize.Level1)
231 {
232 /**
233 * @tc.steps: step1. connect device A with device B
234 */
235 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
236
237 /**
238 * @tc.steps: step2. device A send message(unregistered and tiny) to device B using communicator AA
239 * @tc.expected: step2. send fail
240 */
241 Message *msgForAA = BuildUnRegedTinyMessage();
242 ASSERT_NE(msgForAA, nullptr);
243 SendConfig conf = {true, false, true, 0};
244 int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
245 EXPECT_NE(errCode, E_OK);
246 delete msgForAA;
247 msgForAA = nullptr;
248
249 // CleanUp
250 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
251 }
252
253 /**
254 * @tc.name: Send And Receive 004
255 * @tc.desc: Test send and receive with different users.
256 * @tc.type: FUNC
257 * @tc.require:
258 * @tc.author: liaoyonghuang
259 */
260 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceive004, TestSize.Level1)
261 {
262 /**
263 * @tc.steps: step1. Get communicators for users {"", "user_1", "user_2"}
264 * @tc.expected: step1. ok
265 */
266 ICommunicator *g_commAAUser1 = nullptr;
267 GetCommunicator(LABEL_A, USER_ID_1, g_envDeviceA, &g_commAAUser1);
268 ICommunicator *g_commBAUser1 = nullptr;
269 GetCommunicator(LABEL_A, USER_ID_1, g_envDeviceB, &g_commBAUser1);
270
271 ICommunicator *g_commAAUser2 = nullptr;
272 GetCommunicator(LABEL_A, USER_ID_2, g_envDeviceA, &g_commAAUser2);
273 ICommunicator *g_commBAUser2 = nullptr;
274 GetCommunicator(LABEL_A, USER_ID_2, g_envDeviceB, &g_commBAUser2);
275
276 /**
277 * @tc.steps: step2. Set callback on B, save all message from A
278 * @tc.expected: step2. ok
279 */
280 REG_MESSAGE_CALLBACK(B, A)
281 REG_MESSAGE_CALLBACK(B, AUser1)
282 REG_MESSAGE_CALLBACK(B, AUser2)
283
284 /**
285 * @tc.steps: step3. Connect and send message from A to B.
286 * @tc.expected: step3. ok
287 */
288 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
289
290 Message *msgForAA = BuildRegedTinyMessage();
291 ASSERT_NE(msgForAA, nullptr);
292 Message *msgForAAUser1 = BuildRegedHugeMessage();
293 ASSERT_NE(msgForAAUser1, nullptr);
294 Message *msgForAAUser2 = BuildRegedGiantMessage(HUGE_SIZE + HUGE_SIZE);
295 ASSERT_NE(msgForAAUser2, nullptr);
296 SendConfig conf = {false, false, true, 0};
297 int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
298 EXPECT_EQ(errCode, E_OK);
299 SendConfig confUser1 = {false, true, true, 0, {"appId", "storeId", USER_ID_1, "DeviceB", ""}};
300 errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAAUser1, confUser1);
301 EXPECT_EQ(errCode, E_OK);
302 SendConfig confUser2 = {false, true, true, 0, {"appId", "storeId", USER_ID_2, "DeviceB", ""}};
303 errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAAUser2, confUser2);
304 EXPECT_EQ(errCode, E_OK);
305 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
306 /**
307 * @tc.steps: step4. Check message.
308 * @tc.expected: step4. ok
309 */
310 EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
311 EXPECT_EQ(srcTargetForBAUser1, DEVICE_NAME_A);
312 EXPECT_EQ(srcTargetForBAUser2, DEVICE_NAME_A);
313 CheckRecvMessage(recvMsgForBA, false, REGED_TINY_MSG_ID, TYPE_REQUEST);
314 CheckRecvMessage(recvMsgForBAUser1, false, REGED_HUGE_MSG_ID, TYPE_RESPONSE);
315 CheckRecvMessage(recvMsgForBAUser2, false, REGED_GIANT_MSG_ID, TYPE_NOTIFY);
316 // CleanUp
317 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
318 g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAAUser1, USER_ID_1);
319 g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBAUser1, USER_ID_1);
320 g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAAUser2, USER_ID_2);
321 g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBAUser2, USER_ID_2);
322 }
323
324 /**
325 * @tc.name: Send And Receive 005
326 * @tc.desc: Test send when db closing
327 * @tc.type: FUNC
328 * @tc.require:
329 * @tc.author: bty
330 */
331 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceive005, TestSize.Level1)
332 {
333 // Preset
334 REG_MESSAGE_CALLBACK(A, A);
335 REG_MESSAGE_CALLBACK(B, A);
336 REG_MESSAGE_CALLBACK(B, B);
337
338 /**
339 * @tc.steps: step1. connect device A with device B
340 */
341 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
342 g_commBA->ExchangeClosePending(true);
343 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
344
345 /**
346 * @tc.steps: step2. device A send message(registered and tiny) to device B using communicator AA
347 * @tc.expected: step2. communicator AA received the message
348 */
349 Message *msgForAA = BuildRegedTinyMessage();
350 ASSERT_NE(msgForAA, nullptr);
351 SendConfig conf = {false, false, true, 0, {}};
352 int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
353 EXPECT_EQ(errCode, E_OK);
354 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // sleep 200 ms
355 EXPECT_EQ(recvMsgForAA->GetErrorNo(), E_FEEDBACK_DB_CLOSING);
356 delete recvMsgForAA;
357 recvMsgForAA = nullptr;
358
359 // CleanUp
360 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
361 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
362 }
363
364 /**
365 * @tc.name: Send Flow Control 001
366 * @tc.desc: Test send in nonblock way
367 * @tc.type: FUNC
368 * @tc.require:
369 * @tc.author: xiaozhenjian
370 */
371 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendFlowControl001, TestSize.Level1)
372 {
373 // Preset
374 int countForBA = 0;
375 int countForBB = 0;
__anonf5bf6e8a0202()376 g_commBA->RegOnSendableCallback([&countForBA](){ countForBA++; }, nullptr);
__anonf5bf6e8a0302()377 g_commBB->RegOnSendableCallback([&countForBB](){ countForBB++; }, nullptr);
378
379 /**
380 * @tc.steps: step1. connect device A with device B
381 */
382 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
383 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure send cause by online done
384 countForBA = 0;
385 countForBB = 0;
386
387 /**
388 * @tc.steps: step2. device B simulates send block
389 */
390 g_envDeviceB.adapterHandle->SimulateSendBlock();
391
392 /**
393 * @tc.steps: step3. device B send as much as possible message(unregistered and huge) in nonblock way
394 * to device A using communicator BA until send fail;
395 * @tc.expected: step3. send fail will happen.
396 */
397 int sendCount = 0;
398 while (true) {
399 Message *msgForBA = BuildRegedHugeMessage();
400 ASSERT_NE(msgForBA, nullptr);
401 SendConfig conf = {true, false, true, 0};
402 int errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
403 if (errCode == E_OK) {
404 sendCount++;
405 } else {
406 delete msgForBA;
407 msgForBA = nullptr;
408 break;
409 }
410 }
411
412 /**
413 * @tc.steps: step4. device B simulates send block terminate
414 * @tc.expected: step4. send count before fail is equal as expected. sendable callback happened.
415 */
416 g_envDeviceB.adapterHandle->SimulateSendBlockClear();
417 int expectSendCount = MAX_CAPACITY / (HUGE_SIZE + HEADER_SIZE) +
418 (MAX_CAPACITY % (HUGE_SIZE + HEADER_SIZE) == 0 ? 0 : 1);
419 EXPECT_EQ(sendCount, expectSendCount);
420 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
421 EXPECT_GE(countForBA, 1);
422 EXPECT_GE(countForBB, 1);
423
424 // CleanUp
425 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
426 }
427
428 /**
429 * @tc.name: Send Flow Control 002
430 * @tc.desc: Test send in block(without timeout) way
431 * @tc.type: FUNC
432 * @tc.require:
433 * @tc.author: xiaozhenjian
434 */
435 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendFlowControl002, TestSize.Level1)
436 {
437 // Preset
438 int cntForBA = 0;
439 int cntForBB = 0;
__anonf5bf6e8a0402()440 g_commBA->RegOnSendableCallback([&cntForBA](){ cntForBA++; }, nullptr);
__anonf5bf6e8a0502()441 g_commBB->RegOnSendableCallback([&cntForBB](){ cntForBB++; }, nullptr);
442
443 /**
444 * @tc.steps: step1. connect device A with device B
445 */
446 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
447 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure send cause by online done
448 cntForBA = 0;
449 cntForBB = 0;
450
451 /**
452 * @tc.steps: step2. device B simulates send block
453 */
454 g_envDeviceB.adapterHandle->SimulateSendBlock();
455
456 /**
457 * @tc.steps: step3. device B send a certain message(unregistered and huge) in block way
458 * without timeout to device A using communicator BA;
459 */
460 int sendCount = 0;
461 int sendFailCount = 0;
__anonf5bf6e8a0602() 462 std::thread sendThread([&sendCount, &sendFailCount]() {
463 while (sendCount < SEND_COUNT_GOAL) {
464 Message *msgForBA = BuildRegedHugeMessage();
465 ASSERT_NE(msgForBA, nullptr);
466 SendConfig conf = {false, false, true, 0};
467 int errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
468 if (errCode != E_OK) {
469 delete msgForBA;
470 msgForBA = nullptr;
471 sendFailCount++;
472 }
473 sendCount++;
474 }
475 });
476
477 /**
478 * @tc.steps: step4. device B simulates send block terminate
479 * @tc.expected: step4. send fail count is zero. sendable callback happened.
480 */
481 std::this_thread::sleep_for(std::chrono::milliseconds(200));
482 g_envDeviceB.adapterHandle->SimulateSendBlockClear();
483 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
484 sendThread.join();
485 EXPECT_EQ(sendCount, SEND_COUNT_GOAL);
486 EXPECT_EQ(sendFailCount, 0);
487 EXPECT_GE(cntForBA, 1);
488 EXPECT_GE(cntForBB, 1);
489
490 // CleanUp
491 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
492 }
493
494 /**
495 * @tc.name: Send Flow Control 003
496 * @tc.desc: Test send in block(with timeout) way
497 * @tc.type: FUNC
498 * @tc.require:
499 * @tc.author: xiaozhenjian
500 */
501 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendFlowControl003, TestSize.Level1)
502 {
503 // Preset
504 int cntsForBA = 0;
505 int cntsForBB = 0;
__anonf5bf6e8a0702()506 g_commBA->RegOnSendableCallback([&cntsForBA](){ cntsForBA++; }, nullptr);
__anonf5bf6e8a0802()507 g_commBB->RegOnSendableCallback([&cntsForBB](){ cntsForBB++; }, nullptr);
508
509 /**
510 * @tc.steps: step1. connect device A with device B
511 */
512 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
513 std::this_thread::sleep_for(std::chrono::milliseconds(100));
514 cntsForBA = 0;
515 cntsForBB = 0;
516
517 /**
518 * @tc.steps: step2. device B simulates send block
519 */
520 g_envDeviceB.adapterHandle->SimulateSendBlock();
521
522 /**
523 * @tc.steps: step3. device B send a certain message(unregistered and huge) in block way
524 * with timeout to device A using communicator BA;
525 */
526 int sendCnt = 0;
527 int sendFailCnt = 0;
__anonf5bf6e8a0902() 528 std::thread sendThread([&sendCnt, &sendFailCnt]() {
529 while (sendCnt < SEND_COUNT_GOAL) {
530 Message *msgForBA = BuildRegedHugeMessage();
531 ASSERT_NE(msgForBA, nullptr);
532 SendConfig conf = {false, false, true, 100};
533 int errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf); // 100 ms timeout
534 if (errCode != E_OK) {
535 delete msgForBA;
536 msgForBA = nullptr;
537 sendFailCnt++;
538 }
539 sendCnt++;
540 }
541 });
542
543 /**
544 * @tc.steps: step4. device B simulates send block terminate
545 * @tc.expected: step4. send fail count is no more than expected. sendable callback happened.
546 */
547 std::this_thread::sleep_for(std::chrono::milliseconds(300)); // wait 300 ms
548 g_envDeviceB.adapterHandle->SimulateSendBlockClear();
549 std::this_thread::sleep_for(std::chrono::milliseconds(1200)); // wait 1200 ms
550 sendThread.join();
551 EXPECT_EQ(sendCnt, SEND_COUNT_GOAL);
552 EXPECT_LE(sendFailCnt, 4);
553 EXPECT_GE(cntsForBA, 1);
554 EXPECT_GE(cntsForBB, 1);
555
556 // CleanUp
557 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
558 }
559
560 /**
561 * @tc.name: Receive Check 001
562 * @tc.desc: Receive packet field check
563 * @tc.type: FUNC
564 * @tc.require:
565 * @tc.author: xiaozhenjian
566 */
567 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, ReceiveCheck001, TestSize.Level1)
568 {
569 // Preset
570 int recvCount = 0;
__anonf5bf6e8a0a02(const std::string &srcTarget, Message *inMsg) 571 g_commAA->RegOnMessageCallback([&recvCount](const std::string &srcTarget, Message *inMsg) {
572 recvCount++;
573 if (inMsg != nullptr) {
574 delete inMsg;
575 inMsg = nullptr;
576 }
577 return E_OK;
578 }, nullptr);
579 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
580
581 /**
582 * @tc.steps: step1. create packet with magic field error
583 * @tc.expected: step1. no message callback
584 */
585 g_envDeviceB.adapterHandle->SimulateSendBitErrorInMagicField(true, 0xFFFF);
586 Message *msgForBA = BuildRegedTinyMessage();
587 SendConfig conf = {true, false, true, 0};
588 int errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
589 EXPECT_EQ(errCode, E_OK);
590 std::this_thread::sleep_for(std::chrono::milliseconds(100));
591 EXPECT_EQ(recvCount, 0);
592 g_envDeviceB.adapterHandle->SimulateSendBitErrorInMagicField(false, 0);
593
594 /**
595 * @tc.steps: step2. create packet with version field error
596 * @tc.expected: step2. no message callback
597 */
598 g_envDeviceB.adapterHandle->SimulateSendBitErrorInVersionField(true, 0xFFFF);
599 msgForBA = BuildRegedTinyMessage();
600 errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
601 EXPECT_EQ(errCode, E_OK);
602 std::this_thread::sleep_for(std::chrono::milliseconds(100));
603 EXPECT_EQ(recvCount, 0);
604 g_envDeviceB.adapterHandle->SimulateSendBitErrorInVersionField(false, 0);
605
606 /**
607 * @tc.steps: step3. create packet with checksum field error
608 * @tc.expected: step3. no message callback
609 */
610 g_envDeviceB.adapterHandle->SimulateSendBitErrorInCheckSumField(true, 0xFFFF);
611 msgForBA = BuildRegedTinyMessage();
612 errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
613 EXPECT_EQ(errCode, E_OK);
614 std::this_thread::sleep_for(std::chrono::milliseconds(100));
615 EXPECT_EQ(recvCount, 0);
616 g_envDeviceB.adapterHandle->SimulateSendBitErrorInCheckSumField(false, 0);
617
618 // CleanUp
619 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
620 }
621
622 /**
623 * @tc.name: Receive Check 002
624 * @tc.desc: Receive packet field check
625 * @tc.type: FUNC
626 * @tc.require:
627 * @tc.author: xiaozhenjian
628 */
629 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, ReceiveCheck002, TestSize.Level1)
630 {
631 // Preset
632 int recvCount = 0;
__anonf5bf6e8a0b02(const std::string &srcTarget, Message *inMsg) 633 g_commAA->RegOnMessageCallback([&recvCount](const std::string &srcTarget, Message *inMsg) {
634 recvCount++;
635 if (inMsg != nullptr) {
636 delete inMsg;
637 inMsg = nullptr;
638 }
639 return E_OK;
640 }, nullptr);
641 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
642
643 /**
644 * @tc.steps: step1. create packet with packetLen field error
645 * @tc.expected: step1. no message callback
646 */
647 g_envDeviceB.adapterHandle->SimulateSendBitErrorInPacketLenField(true, 0xFFFF);
648 Message *msgForBA = BuildRegedTinyMessage();
649 SendConfig conf = {true, false, true, 0};
650 int errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
651 EXPECT_EQ(errCode, E_OK);
652 std::this_thread::sleep_for(std::chrono::milliseconds(100));
653 EXPECT_EQ(recvCount, 0);
654 g_envDeviceB.adapterHandle->SimulateSendBitErrorInPacketLenField(false, 0);
655
656 /**
657 * @tc.steps: step1. create packet with packetType field error
658 * @tc.expected: step1. no message callback
659 */
660 g_envDeviceB.adapterHandle->SimulateSendBitErrorInPacketTypeField(true, 0xFF);
661 msgForBA = BuildRegedTinyMessage();
662 errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
663 EXPECT_EQ(errCode, E_OK);
664 std::this_thread::sleep_for(std::chrono::milliseconds(100));
665 EXPECT_EQ(recvCount, 0);
666 g_envDeviceB.adapterHandle->SimulateSendBitErrorInPacketTypeField(false, 0);
667
668 /**
669 * @tc.steps: step1. create packet with paddingLen field error
670 * @tc.expected: step1. no message callback
671 */
672 g_envDeviceB.adapterHandle->SimulateSendBitErrorInPaddingLenField(true, 0xFF);
673 msgForBA = BuildRegedTinyMessage();
674 errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
675 EXPECT_EQ(errCode, E_OK);
676 std::this_thread::sleep_for(std::chrono::milliseconds(100));
677 EXPECT_EQ(recvCount, 0);
678 g_envDeviceB.adapterHandle->SimulateSendBitErrorInPaddingLenField(false, 0);
679
680 // CleanUp
681 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
682 }
683
684 /**
685 * @tc.name: Send Result Notify 001
686 * @tc.desc: Test send result notify
687 * @tc.type: FUNC
688 * @tc.require:
689 * @tc.author: xiaozhenjian
690 */
691 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendResultNotify001, TestSize.Level1)
692 {
693 // preset
694 std::vector<int> sendResult;
__anonf5bf6e8a0c02(int result, bool isDirectEnd) 695 auto sendResultNotifier = [&sendResult](int result, bool isDirectEnd) {
696 sendResult.push_back(result);
697 };
698
699 /**
700 * @tc.steps: step1. connect device A with device B
701 */
702 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
703
704 /**
705 * @tc.steps: step2. device A send message to device B using communicator AA
706 * @tc.expected: step2. notify send done and success
707 */
708 Message *msgForAA = BuildRegedTinyMessage();
709 ASSERT_NE(msgForAA, nullptr);
710 SendConfig conf = {false, false, true, 0};
711 int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf, sendResultNotifier);
712 EXPECT_EQ(errCode, E_OK);
713 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
714 ASSERT_EQ(sendResult.size(), static_cast<size_t>(1)); // 1 notify
715 EXPECT_EQ(sendResult[0], E_OK);
716
717 /**
718 * @tc.steps: step3. disconnect device A with device B
719 */
720 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
721
722 /**
723 * @tc.steps: step4. device A send message to device B using communicator AA
724 * @tc.expected: step2. notify send done and fail
725 */
726 msgForAA = BuildRegedTinyMessage();
727 ASSERT_NE(msgForAA, nullptr);
728 errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf, sendResultNotifier);
729 EXPECT_EQ(errCode, E_OK);
730 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
731 ASSERT_EQ(sendResult.size(), static_cast<size_t>(2)); // 2 notify
732 EXPECT_NE(sendResult[1], E_OK); // 1 for second element
733 }
734
735 /**
736 * @tc.name: Message Feedback 001
737 * @tc.desc: Test feedback not support messageid and communicator not found
738 * @tc.type: FUNC
739 * @tc.require:
740 * @tc.author: xiaozhenjian
741 */
742 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, MessageFeedback001, TestSize.Level1)
743 {
744 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
745 // preset
746 REG_MESSAGE_CALLBACK(A, A);
747 REG_MESSAGE_CALLBACK(B, A);
748 REG_MESSAGE_CALLBACK(B, B);
749
750 /**
751 * @tc.steps: step1. connect device A with device B
752 */
753 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
754
755 /**
756 * @tc.steps: step2. device B send message to device A using communicator BB
757 * @tc.expected: step2. communicator BB receive communicator not found feedback
758 */
759 Message *msgForBB = BuildRegedTinyMessage();
760 ASSERT_NE(msgForBB, nullptr);
761 SendConfig conf = {false, false, true, 0};
762 int errCode = g_commBB->SendMessage(DEVICE_NAME_A, msgForBB, conf);
763 EXPECT_EQ(errCode, E_OK);
764 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
765 ASSERT_NE(recvMsgForBB, nullptr);
766 EXPECT_EQ(srcTargetForBB, DEVICE_NAME_A);
767 EXPECT_EQ(recvMsgForBB->GetMessageId(), REGED_TINY_MSG_ID);
768 EXPECT_EQ(recvMsgForBB->GetMessageType(), TYPE_RESPONSE);
769 EXPECT_EQ(recvMsgForBB->GetSessionId(), FIXED_SESSIONID);
770 EXPECT_EQ(recvMsgForBB->GetSequenceId(), FIXED_SEQUENCEID);
771 EXPECT_EQ(recvMsgForBB->GetErrorNo(), static_cast<uint32_t>(E_FEEDBACK_COMMUNICATOR_NOT_FOUND));
772 EXPECT_EQ(recvMsgForBB->GetObject<RegedTinyObject>(), nullptr);
773 delete recvMsgForBB;
774 recvMsgForBB = nullptr;
775
776 /**
777 * @tc.steps: step3. simulate messageid not registered
778 */
779 g_envDeviceB.adapterHandle->SimulateSendBitErrorInMessageIdField(true, UNREGED_TINY_MSG_ID);
780
781 /**
782 * @tc.steps: step4. device B send message to device A using communicator BA
783 * @tc.expected: step4. communicator BA receive messageid not register feedback
784 */
785 Message *msgForBA = BuildRegedTinyMessage();
786 ASSERT_NE(msgForBA, nullptr);
787 errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
788 EXPECT_EQ(errCode, E_OK);
789 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
790 ASSERT_NE(recvMsgForBA, nullptr);
791 EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
792 EXPECT_EQ(recvMsgForBA->GetMessageId(), UNREGED_TINY_MSG_ID);
793 EXPECT_EQ(recvMsgForBA->GetMessageType(), TYPE_RESPONSE);
794 EXPECT_EQ(recvMsgForBA->GetSessionId(), FIXED_SESSIONID);
795 EXPECT_EQ(recvMsgForBA->GetSequenceId(), FIXED_SEQUENCEID);
796 EXPECT_EQ(recvMsgForBA->GetErrorNo(), static_cast<uint32_t>(E_FEEDBACK_UNKNOWN_MESSAGE));
797 EXPECT_EQ(recvMsgForBA->GetObject<RegedTinyObject>(), nullptr);
798 delete recvMsgForBA;
799 recvMsgForBA = nullptr;
800
801 // CleanUp
802 g_envDeviceB.adapterHandle->SimulateSendBitErrorInMessageIdField(false, 0);
803 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
804 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
805 }
806
807 /**
808 * @tc.name: SendAndReceiveWithExtendHead001
809 * @tc.desc: Test fill extendHead func
810 * @tc.type: FUNC
811 * @tc.require:
812 * @tc.author: zhuwentao
813 */
814 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceiveWithExtendHead001, TestSize.Level1)
815 {
816 // Preset
817 TimeSync::RegisterTransformFunc();
818 REG_MESSAGE_CALLBACK(A, A);
819 REG_MESSAGE_CALLBACK(B, A);
820
821 /**
822 * @tc.steps: step1. connect device A with device B
823 */
824 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
825
826 /**
827 * @tc.steps: step2. device A send ApplayerFrameMessage to device B using communicator AA with extednHead
828 * @tc.expected: step2. communicator BA received the message
829 */
830 Message *msgForAA = BuildAppLayerFrameMessage();
831 ASSERT_NE(msgForAA, nullptr);
832 UserInfo userInfo = {"", "userId"};
833 g_envDeviceB.adapterHandle->SetUserInfo({userInfo});
834 SendConfig conf = {false, true, true, 0, {"appId", "storeId", "", "DeviceB"}};
835 int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
836 EXPECT_EQ(errCode, E_OK);
837 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // sleep 200 ms
838 EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
839 ASSERT_NE(recvMsgForBA, nullptr);
840 delete recvMsgForBA;
841 recvMsgForBA = nullptr;
842 DistributedDB::ProtocolProto::UnRegTransformFunction(DistributedDB::TIME_SYNC_MESSAGE);
843 // CleanUp
844 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
845 }
846
847 /**
848 * @tc.name: GetDataUserInfoTest001
849 * @tc.desc: Test GetDataUserInfo return NEED_CORRECT_TARGET_USER
850 * @tc.type: FUNC
851 * @tc.require:
852 * @tc.author: liaoyonghuang
853 */
854 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, GetDataUserInfoTest001, TestSize.Level1)
855 {
856 // Preset
857 TimeSync::RegisterTransformFunc();
858 REG_MESSAGE_CALLBACK(A, A);
859 REG_MESSAGE_CALLBACK(B, A);
860
861 /**
862 * @tc.steps: step1. connect device A with device B
863 */
864 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
865
866 /**
867 * @tc.steps: step2. return NEED_CORRECT_TARGET_USER when get data user info
868 */
869 std::shared_ptr<ProcessCommunicatorTestStub> processCommunicator = std::make_shared<ProcessCommunicatorTestStub>();
870 processCommunicator->SetGetDataUserInfoRet(NEED_CORRECT_TARGET_USER);
871 g_envDeviceB.adapterHandle->SetProcessCommunicator(processCommunicator);
872 /**
873 * @tc.steps: step3. device A send ApplayerFrameMessage to device B using communicator AA with extednHead
874 * @tc.expected: step3. communicator AA received the message, errNo is E_NEED_CORRECT_TARGET_USER
875 */
876 Message *msgForAA = BuildAppLayerFrameMessage();
877 ASSERT_NE(msgForAA, nullptr);
878 UserInfo userInfo = {"", "userId"};
879 g_envDeviceB.adapterHandle->SetUserInfo({userInfo});
880 SendConfig conf = {false, true, true, 0, {"appId", "storeId", "", "DeviceB"}};
881 int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
882 EXPECT_EQ(errCode, E_OK);
883 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // sleep 200 ms
884 ASSERT_NE(recvMsgForAA, nullptr);
885 EXPECT_EQ(recvMsgForAA->GetErrorNo(), E_NEED_CORRECT_TARGET_USER);
886 delete recvMsgForAA;
887 recvMsgForAA = nullptr;
888 DistributedDB::ProtocolProto::UnRegTransformFunction(DistributedDB::TIME_SYNC_MESSAGE);
889 // CleanUp
890 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
891 }
892
893 /**
894 * @tc.name: ToSerialBufferTest001
895 * @tc.desc: Test ToSerialBuffer func
896 * @tc.type: FUNC
897 * @tc.require:
898 * @tc.author: tiansimiao
899 */
900 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, ToSerialBufferTest001, TestSize.Level1)
901 {
902 std::shared_ptr<ExtendHeaderHandle> extendHandle;
903 int errorNo = E_OK;
904 SerialBuffer* buffer = ProtocolProto::ToSerialBuffer(nullptr, extendHandle, false, errorNo);
905 EXPECT_EQ(errorNo, -E_INVALID_ARGS);
906 EXPECT_EQ(buffer, nullptr);
907 }
908
909 /**
910 * @tc.name: ToMessageTest001
911 * @tc.desc: Test ToMessage func
912 * @tc.type: FUNC
913 * @tc.require:
914 * @tc.author: tiansimiao
915 */
916 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, ToMessageTest001, TestSize.Level1)
917 {
918 int errorNo = E_OK;
919 Message* msg = ProtocolProto::ToMessage(nullptr, errorNo, false);
920 EXPECT_EQ(errorNo, -E_INVALID_ARGS);
921 EXPECT_EQ(msg, nullptr);
922 }