• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 #include <thread>
18 #include "db_errno.h"
19 #include "distributeddb_communicator_common.h"
20 #include "distributeddb_tools_unit_test.h"
21 #include "log_print.h"
22 #include "message.h"
23 #include "protocol_proto.h"
24 #include "time_sync.h"
25 #include "sync_types.h"
26 
27 using namespace std;
28 using namespace testing::ext;
29 using namespace DistributedDB;
30 
31 namespace {
32     constexpr int SEND_COUNT_GOAL = 20; // Send 20 times
33 
34     EnvHandle g_envDeviceA;
35     EnvHandle g_envDeviceB;
36     ICommunicator *g_commAA = nullptr;
37     ICommunicator *g_commBA = nullptr;
38     ICommunicator *g_commBB = nullptr;
39 }
40 
41 class DistributedDBCommunicatorSendReceiveTest : public testing::Test {
42 public:
43     static void SetUpTestCase(void);
44     static void TearDownTestCase(void);
45     void SetUp();
46     void TearDown();
47 };
48 
SetUpTestCase(void)49 void DistributedDBCommunicatorSendReceiveTest::SetUpTestCase(void)
50 {
51     /**
52      * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
53      */
54     LOGI("[UT][SendRecvTest][SetUpTestCase] Enter.");
55     bool errCode = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
56     ASSERT_EQ(errCode, true);
57     errCode = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
58     ASSERT_EQ(errCode, true);
59     DoRegTransformFunction();
60     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
61 }
62 
TearDownTestCase(void)63 void DistributedDBCommunicatorSendReceiveTest::TearDownTestCase(void)
64 {
65     /**
66      * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
67      */
68     LOGI("[UT][SendRecvTest][TearDownTestCase] Enter.");
69     std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
70     TearDownEnv(g_envDeviceA);
71     TearDownEnv(g_envDeviceB);
72     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
73 }
74 
GetCommunicator(uint64_t label,const std::string & userId,EnvHandle & device,ICommunicator ** comm)75 static void GetCommunicator(uint64_t label, const std::string &userId, EnvHandle &device, ICommunicator **comm)
76 {
77     int errorNo = E_OK;
78     *comm = device.commAggrHandle->AllocCommunicator(label, errorNo, userId);
79     ASSERT_EQ(errorNo, E_OK);
80     ASSERT_NOT_NULL_AND_ACTIVATE(*comm, userId);
81 }
82 
SetUp()83 void DistributedDBCommunicatorSendReceiveTest::SetUp()
84 {
85     DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
86     /**
87      * @tc.setup: Alloc communicator AA, BA, BB
88      */
89     GetCommunicator(LABEL_A, "", g_envDeviceA, &g_commAA);
90     GetCommunicator(LABEL_A, "", g_envDeviceB, &g_commBA);
91     GetCommunicator(LABEL_B, "", g_envDeviceB, &g_commBB);
92 }
93 
TearDown()94 void DistributedDBCommunicatorSendReceiveTest::TearDown()
95 {
96     /**
97      * @tc.teardown: Release communicator AA, BA, BB
98      */
99     g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAA);
100     g_commAA = nullptr;
101     g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBA);
102     g_commBA = nullptr;
103     g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBB);
104     g_commBA = nullptr;
105     std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure all thread quiet
106 }
107 
BuildAppLayerFrameMessage()108 static Message *BuildAppLayerFrameMessage()
109 {
110     DistributedDBUnitTest::DataSyncMessageInfo info;
111     info.messageId_ = DistributedDB::TIME_SYNC_MESSAGE;
112     info.messageType_ = TYPE_REQUEST;
113     DistributedDB::Message *message = nullptr;
114     DistributedDBUnitTest::DistributedDBToolsUnitTest::BuildMessage(info, message);
115     return message;
116 }
117 
CheckRecvMessage(Message * recvMsg,bool isEmpty,uint32_t msgId,uint32_t msgType)118 static void CheckRecvMessage(Message *recvMsg, bool isEmpty, uint32_t msgId, uint32_t msgType)
119 {
120     if (isEmpty) {
121         EXPECT_EQ(recvMsg, nullptr);
122     } else {
123         ASSERT_NE(recvMsg, nullptr);
124         EXPECT_EQ(recvMsg->GetMessageId(), msgId);
125         EXPECT_EQ(recvMsg->GetMessageType(), msgType);
126         EXPECT_EQ(recvMsg->GetSessionId(), FIXED_SESSIONID);
127         EXPECT_EQ(recvMsg->GetSequenceId(), FIXED_SEQUENCEID);
128         EXPECT_EQ(recvMsg->GetErrorNo(), NO_ERROR);
129         delete recvMsg;
130         recvMsg = nullptr;
131     }
132 }
133 
134 #define REG_MESSAGE_CALLBACK(src, label) \
135     string srcTargetFor##src##label; \
136     Message *recvMsgFor##src##label = nullptr; \
137     g_comm##src##label->RegOnMessageCallback( \
138         [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, Message *inMsg) { \
139         srcTargetFor##src##label = srcTarget; \
140         recvMsgFor##src##label = inMsg; \
141         return E_OK; \
142     }, nullptr);
143 
144 /**
145  * @tc.name: Send And Receive 001
146  * @tc.desc: Test send and receive based on equipment communicator
147  * @tc.type: FUNC
148  * @tc.require:
149  * @tc.author: xiaozhenjian
150  */
151 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceive001, TestSize.Level1)
152 {
153     // Preset
154     REG_MESSAGE_CALLBACK(A, A);
155     REG_MESSAGE_CALLBACK(B, A);
156     REG_MESSAGE_CALLBACK(B, B);
157 
158     /**
159      * @tc.steps: step1. connect device A with device B
160      */
161     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
162 
163     /**
164      * @tc.steps: step2. device A send message(registered and tiny) to device B using communicator AA
165      * @tc.expected: step2. communicator BA received the message
166      */
167     Message *msgForAA = BuildRegedTinyMessage();
168     ASSERT_NE(msgForAA, nullptr);
169     SendConfig conf = {false, false, true, 0, {}};
170     int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
171     EXPECT_EQ(errCode, E_OK);
172     std::this_thread::sleep_for(std::chrono::milliseconds(200)); // sleep 200 ms
173     CheckRecvMessage(recvMsgForBB, true, 0, 0);
174     EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
175     CheckRecvMessage(recvMsgForBA, false, REGED_TINY_MSG_ID, TYPE_REQUEST);
176 
177     /**
178      * @tc.steps: step3. device B send message(registered and tiny) to device A using communicator BB
179      * @tc.expected: step3. communicator AA did not receive the message
180      */
181     Message *msgForBB = BuildRegedTinyMessage();
182     ASSERT_NE(msgForBB, nullptr);
183     conf = {true, 0};
184     errCode = g_commBB->SendMessage(DEVICE_NAME_A, msgForBB, conf);
185     EXPECT_EQ(errCode, E_OK);
186     std::this_thread::sleep_for(std::chrono::milliseconds(100));
187     EXPECT_EQ(srcTargetForAA, "");
188 
189     // CleanUp
190     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
191 }
192 
193 /**
194  * @tc.name: Send And Receive 002
195  * @tc.desc: Test send oversize message will fail
196  * @tc.type: FUNC
197  * @tc.require:
198  * @tc.author: xiaozhenjian
199  */
200 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceive002, TestSize.Level1)
201 {
202     /**
203      * @tc.steps: step1. connect device A with device B
204      */
205     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
206 
207     /**
208      * @tc.steps: step2. device A send message(registered and oversize) to device B using communicator AA
209      * @tc.expected: step2. send fail
210      */
211     Message *msgForAA = BuildRegedOverSizeMessage();
212     ASSERT_NE(msgForAA, nullptr);
213     SendConfig conf = {true, false, true, 0};
214     int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
215     EXPECT_NE(errCode, E_OK);
216     delete msgForAA;
217     msgForAA = nullptr;
218 
219     // CleanUp
220     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
221 }
222 
223 /**
224  * @tc.name: Send And Receive 003
225  * @tc.desc: Test send unregistered message will fail
226  * @tc.type: FUNC
227  * @tc.require:
228  * @tc.author: xiaozhenjian
229  */
230 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceive003, TestSize.Level1)
231 {
232     /**
233      * @tc.steps: step1. connect device A with device B
234      */
235     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
236 
237     /**
238      * @tc.steps: step2. device A send message(unregistered and tiny) to device B using communicator AA
239      * @tc.expected: step2. send fail
240      */
241     Message *msgForAA = BuildUnRegedTinyMessage();
242     ASSERT_NE(msgForAA, nullptr);
243     SendConfig conf = {true, false, true, 0};
244     int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
245     EXPECT_NE(errCode, E_OK);
246     delete msgForAA;
247     msgForAA = nullptr;
248 
249     // CleanUp
250     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
251 }
252 
253 /**
254  * @tc.name: Send And Receive 004
255  * @tc.desc: Test send and receive with different users.
256  * @tc.type: FUNC
257  * @tc.require:
258  * @tc.author: liaoyonghuang
259  */
260 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceive004, TestSize.Level1)
261 {
262     /**
263      * @tc.steps: step1. Get communicators for users {"", "user_1", "user_2"}
264      * @tc.expected: step1. ok
265      */
266     ICommunicator *g_commAAUser1 = nullptr;
267     GetCommunicator(LABEL_A, USER_ID_1, g_envDeviceA, &g_commAAUser1);
268     ICommunicator *g_commBAUser1 = nullptr;
269     GetCommunicator(LABEL_A, USER_ID_1, g_envDeviceB, &g_commBAUser1);
270 
271     ICommunicator *g_commAAUser2 = nullptr;
272     GetCommunicator(LABEL_A, USER_ID_2, g_envDeviceA, &g_commAAUser2);
273     ICommunicator *g_commBAUser2 = nullptr;
274     GetCommunicator(LABEL_A, USER_ID_2, g_envDeviceB, &g_commBAUser2);
275 
276     /**
277      * @tc.steps: step2. Set callback on B, save all message from A
278      * @tc.expected: step2. ok
279      */
280     REG_MESSAGE_CALLBACK(B, A)
281     REG_MESSAGE_CALLBACK(B, AUser1)
282     REG_MESSAGE_CALLBACK(B, AUser2)
283 
284     /**
285      * @tc.steps: step3. Connect and send message from A to B.
286      * @tc.expected: step3. ok
287      */
288     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
289 
290     Message *msgForAA = BuildRegedTinyMessage();
291     ASSERT_NE(msgForAA, nullptr);
292     Message *msgForAAUser1 = BuildRegedHugeMessage();
293     ASSERT_NE(msgForAAUser1, nullptr);
294     Message *msgForAAUser2 = BuildRegedGiantMessage(HUGE_SIZE + HUGE_SIZE);
295     ASSERT_NE(msgForAAUser2, nullptr);
296     SendConfig conf = {false, false, true, 0};
297     int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
298     EXPECT_EQ(errCode, E_OK);
299     SendConfig confUser1 = {false, true, true, 0, {"appId", "storeId", USER_ID_1, "DeviceB", ""}};
300     errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAAUser1, confUser1);
301     EXPECT_EQ(errCode, E_OK);
302     SendConfig confUser2 = {false, true, true, 0, {"appId", "storeId", USER_ID_2, "DeviceB", ""}};
303     errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAAUser2, confUser2);
304     EXPECT_EQ(errCode, E_OK);
305     std::this_thread::sleep_for(std::chrono::milliseconds(1000));
306     /**
307      * @tc.steps: step4. Check message.
308      * @tc.expected: step4. ok
309      */
310     EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
311     EXPECT_EQ(srcTargetForBAUser1, DEVICE_NAME_A);
312     EXPECT_EQ(srcTargetForBAUser2, DEVICE_NAME_A);
313     CheckRecvMessage(recvMsgForBA, false, REGED_TINY_MSG_ID, TYPE_REQUEST);
314     CheckRecvMessage(recvMsgForBAUser1, false, REGED_HUGE_MSG_ID, TYPE_RESPONSE);
315     CheckRecvMessage(recvMsgForBAUser2, false, REGED_GIANT_MSG_ID, TYPE_NOTIFY);
316     // CleanUp
317     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
318     g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAAUser1, USER_ID_1);
319     g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBAUser1, USER_ID_1);
320     g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAAUser2, USER_ID_2);
321     g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBAUser2, USER_ID_2);
322 }
323 
324 /**
325  * @tc.name: Send And Receive 005
326  * @tc.desc: Test send when db closing
327  * @tc.type: FUNC
328  * @tc.require:
329  * @tc.author: bty
330  */
331 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceive005, TestSize.Level1)
332 {
333     // Preset
334     REG_MESSAGE_CALLBACK(A, A);
335     REG_MESSAGE_CALLBACK(B, A);
336     REG_MESSAGE_CALLBACK(B, B);
337 
338     /**
339      * @tc.steps: step1. connect device A with device B
340      */
341     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
342     g_commBA->ExchangeClosePending(true);
343     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
344 
345     /**
346      * @tc.steps: step2. device A send message(registered and tiny) to device B using communicator AA
347      * @tc.expected: step2. communicator AA received the message
348      */
349     Message *msgForAA = BuildRegedTinyMessage();
350     ASSERT_NE(msgForAA, nullptr);
351     SendConfig conf = {false, false, true, 0, {}};
352     int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
353     EXPECT_EQ(errCode, E_OK);
354     std::this_thread::sleep_for(std::chrono::milliseconds(200)); // sleep 200 ms
355     EXPECT_EQ(recvMsgForAA->GetErrorNo(), E_FEEDBACK_DB_CLOSING);
356     delete recvMsgForAA;
357     recvMsgForAA = nullptr;
358 
359     // CleanUp
360     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
361     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
362 }
363 
364 /**
365  * @tc.name: Send Flow Control 001
366  * @tc.desc: Test send in nonblock way
367  * @tc.type: FUNC
368  * @tc.require:
369  * @tc.author: xiaozhenjian
370  */
371 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendFlowControl001, TestSize.Level1)
372 {
373     // Preset
374     int countForBA = 0;
375     int countForBB = 0;
__anonf5bf6e8a0202()376     g_commBA->RegOnSendableCallback([&countForBA](){ countForBA++; }, nullptr);
__anonf5bf6e8a0302()377     g_commBB->RegOnSendableCallback([&countForBB](){ countForBB++; }, nullptr);
378 
379     /**
380      * @tc.steps: step1. connect device A with device B
381      */
382     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
383     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure send cause by online done
384     countForBA = 0;
385     countForBB = 0;
386 
387     /**
388      * @tc.steps: step2. device B simulates send block
389      */
390     g_envDeviceB.adapterHandle->SimulateSendBlock();
391 
392     /**
393      * @tc.steps: step3. device B send as much as possible message(unregistered and huge) in nonblock way
394      *                   to device A using communicator BA until send fail;
395      * @tc.expected: step3. send fail will happen.
396      */
397     int sendCount = 0;
398     while (true) {
399         Message *msgForBA = BuildRegedHugeMessage();
400         ASSERT_NE(msgForBA, nullptr);
401         SendConfig conf = {true, false, true, 0};
402         int errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
403         if (errCode == E_OK) {
404             sendCount++;
405         } else {
406             delete msgForBA;
407             msgForBA = nullptr;
408             break;
409         }
410     }
411 
412     /**
413      * @tc.steps: step4. device B simulates send block terminate
414      * @tc.expected: step4. send count before fail is equal as expected. sendable callback happened.
415      */
416     g_envDeviceB.adapterHandle->SimulateSendBlockClear();
417     int expectSendCount = MAX_CAPACITY / (HUGE_SIZE + HEADER_SIZE) +
418         (MAX_CAPACITY % (HUGE_SIZE + HEADER_SIZE) == 0 ? 0 : 1);
419     EXPECT_EQ(sendCount, expectSendCount);
420     std::this_thread::sleep_for(std::chrono::milliseconds(1000));
421     EXPECT_GE(countForBA, 1);
422     EXPECT_GE(countForBB, 1);
423 
424     // CleanUp
425     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
426 }
427 
428 /**
429  * @tc.name: Send Flow Control 002
430  * @tc.desc: Test send in block(without timeout) way
431  * @tc.type: FUNC
432  * @tc.require:
433  * @tc.author: xiaozhenjian
434  */
435 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendFlowControl002, TestSize.Level1)
436 {
437     // Preset
438     int cntForBA = 0;
439     int cntForBB = 0;
__anonf5bf6e8a0402()440     g_commBA->RegOnSendableCallback([&cntForBA](){ cntForBA++; }, nullptr);
__anonf5bf6e8a0502()441     g_commBB->RegOnSendableCallback([&cntForBB](){ cntForBB++; }, nullptr);
442 
443     /**
444      * @tc.steps: step1. connect device A with device B
445      */
446     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
447     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure send cause by online done
448     cntForBA = 0;
449     cntForBB = 0;
450 
451     /**
452      * @tc.steps: step2. device B simulates send block
453      */
454     g_envDeviceB.adapterHandle->SimulateSendBlock();
455 
456     /**
457      * @tc.steps: step3. device B send a certain message(unregistered and huge) in block way
458      *                   without timeout to device A using communicator BA;
459      */
460     int sendCount = 0;
461     int sendFailCount = 0;
__anonf5bf6e8a0602() 462     std::thread sendThread([&sendCount, &sendFailCount]() {
463         while (sendCount < SEND_COUNT_GOAL) {
464             Message *msgForBA = BuildRegedHugeMessage();
465             ASSERT_NE(msgForBA, nullptr);
466             SendConfig conf = {false, false, true, 0};
467             int errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
468             if (errCode != E_OK) {
469                 delete msgForBA;
470                 msgForBA = nullptr;
471                 sendFailCount++;
472             }
473             sendCount++;
474         }
475     });
476 
477     /**
478      * @tc.steps: step4. device B simulates send block terminate
479      * @tc.expected: step4. send fail count is zero. sendable callback happened.
480      */
481     std::this_thread::sleep_for(std::chrono::milliseconds(200));
482     g_envDeviceB.adapterHandle->SimulateSendBlockClear();
483     std::this_thread::sleep_for(std::chrono::milliseconds(1000));
484     sendThread.join();
485     EXPECT_EQ(sendCount, SEND_COUNT_GOAL);
486     EXPECT_EQ(sendFailCount, 0);
487     EXPECT_GE(cntForBA, 1);
488     EXPECT_GE(cntForBB, 1);
489 
490     // CleanUp
491     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
492 }
493 
494 /**
495  * @tc.name: Send Flow Control 003
496  * @tc.desc: Test send in block(with timeout) way
497  * @tc.type: FUNC
498  * @tc.require:
499  * @tc.author: xiaozhenjian
500  */
501 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendFlowControl003, TestSize.Level1)
502 {
503     // Preset
504     int cntsForBA = 0;
505     int cntsForBB = 0;
__anonf5bf6e8a0702()506     g_commBA->RegOnSendableCallback([&cntsForBA](){ cntsForBA++; }, nullptr);
__anonf5bf6e8a0802()507     g_commBB->RegOnSendableCallback([&cntsForBB](){ cntsForBB++; }, nullptr);
508 
509     /**
510      * @tc.steps: step1. connect device A with device B
511      */
512     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
513     std::this_thread::sleep_for(std::chrono::milliseconds(100));
514     cntsForBA = 0;
515     cntsForBB = 0;
516 
517     /**
518      * @tc.steps: step2. device B simulates send block
519      */
520     g_envDeviceB.adapterHandle->SimulateSendBlock();
521 
522      /**
523      * @tc.steps: step3. device B send a certain message(unregistered and huge) in block way
524      *                   with timeout to device A using communicator BA;
525      */
526     int sendCnt = 0;
527     int sendFailCnt = 0;
__anonf5bf6e8a0902() 528     std::thread sendThread([&sendCnt, &sendFailCnt]() {
529         while (sendCnt < SEND_COUNT_GOAL) {
530             Message *msgForBA = BuildRegedHugeMessage();
531             ASSERT_NE(msgForBA, nullptr);
532             SendConfig conf = {false, false, true, 100};
533             int errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf); // 100 ms timeout
534             if (errCode != E_OK) {
535                 delete msgForBA;
536                 msgForBA = nullptr;
537                 sendFailCnt++;
538             }
539             sendCnt++;
540         }
541     });
542 
543     /**
544      * @tc.steps: step4. device B simulates send block terminate
545      * @tc.expected: step4. send fail count is no more than expected. sendable callback happened.
546      */
547     std::this_thread::sleep_for(std::chrono::milliseconds(300)); // wait 300 ms
548     g_envDeviceB.adapterHandle->SimulateSendBlockClear();
549     std::this_thread::sleep_for(std::chrono::milliseconds(1200)); // wait 1200 ms
550     sendThread.join();
551     EXPECT_EQ(sendCnt, SEND_COUNT_GOAL);
552     EXPECT_LE(sendFailCnt, 4);
553     EXPECT_GE(cntsForBA, 1);
554     EXPECT_GE(cntsForBB, 1);
555 
556     // CleanUp
557     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
558 }
559 
560 /**
561  * @tc.name: Receive Check 001
562  * @tc.desc: Receive packet field check
563  * @tc.type: FUNC
564  * @tc.require:
565  * @tc.author: xiaozhenjian
566  */
567 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, ReceiveCheck001, TestSize.Level1)
568 {
569     // Preset
570     int recvCount = 0;
__anonf5bf6e8a0a02(const std::string &srcTarget, Message *inMsg) 571     g_commAA->RegOnMessageCallback([&recvCount](const std::string &srcTarget, Message *inMsg) {
572         recvCount++;
573         if (inMsg != nullptr) {
574             delete inMsg;
575             inMsg = nullptr;
576         }
577         return E_OK;
578     }, nullptr);
579     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
580 
581     /**
582      * @tc.steps: step1. create packet with magic field error
583      * @tc.expected: step1. no message callback
584      */
585     g_envDeviceB.adapterHandle->SimulateSendBitErrorInMagicField(true, 0xFFFF);
586     Message *msgForBA = BuildRegedTinyMessage();
587     SendConfig conf = {true, false, true, 0};
588     int errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
589     EXPECT_EQ(errCode, E_OK);
590     std::this_thread::sleep_for(std::chrono::milliseconds(100));
591     EXPECT_EQ(recvCount, 0);
592     g_envDeviceB.adapterHandle->SimulateSendBitErrorInMagicField(false, 0);
593 
594     /**
595      * @tc.steps: step2. create packet with version field error
596      * @tc.expected: step2. no message callback
597      */
598     g_envDeviceB.adapterHandle->SimulateSendBitErrorInVersionField(true, 0xFFFF);
599     msgForBA = BuildRegedTinyMessage();
600     errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
601     EXPECT_EQ(errCode, E_OK);
602     std::this_thread::sleep_for(std::chrono::milliseconds(100));
603     EXPECT_EQ(recvCount, 0);
604     g_envDeviceB.adapterHandle->SimulateSendBitErrorInVersionField(false, 0);
605 
606     /**
607      * @tc.steps: step3. create packet with checksum field error
608      * @tc.expected: step3. no message callback
609      */
610     g_envDeviceB.adapterHandle->SimulateSendBitErrorInCheckSumField(true, 0xFFFF);
611     msgForBA = BuildRegedTinyMessage();
612     errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
613     EXPECT_EQ(errCode, E_OK);
614     std::this_thread::sleep_for(std::chrono::milliseconds(100));
615     EXPECT_EQ(recvCount, 0);
616     g_envDeviceB.adapterHandle->SimulateSendBitErrorInCheckSumField(false, 0);
617 
618     // CleanUp
619     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
620 }
621 
622 /**
623  * @tc.name: Receive Check 002
624  * @tc.desc: Receive packet field check
625  * @tc.type: FUNC
626  * @tc.require:
627  * @tc.author: xiaozhenjian
628  */
629 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, ReceiveCheck002, TestSize.Level1)
630 {
631     // Preset
632     int recvCount = 0;
__anonf5bf6e8a0b02(const std::string &srcTarget, Message *inMsg) 633     g_commAA->RegOnMessageCallback([&recvCount](const std::string &srcTarget, Message *inMsg) {
634         recvCount++;
635         if (inMsg != nullptr) {
636             delete inMsg;
637             inMsg = nullptr;
638         }
639         return E_OK;
640     }, nullptr);
641     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
642 
643     /**
644      * @tc.steps: step1. create packet with packetLen field error
645      * @tc.expected: step1. no message callback
646      */
647     g_envDeviceB.adapterHandle->SimulateSendBitErrorInPacketLenField(true, 0xFFFF);
648     Message *msgForBA = BuildRegedTinyMessage();
649     SendConfig conf = {true, false, true, 0};
650     int errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
651     EXPECT_EQ(errCode, E_OK);
652     std::this_thread::sleep_for(std::chrono::milliseconds(100));
653     EXPECT_EQ(recvCount, 0);
654     g_envDeviceB.adapterHandle->SimulateSendBitErrorInPacketLenField(false, 0);
655 
656     /**
657      * @tc.steps: step1. create packet with packetType field error
658      * @tc.expected: step1. no message callback
659      */
660     g_envDeviceB.adapterHandle->SimulateSendBitErrorInPacketTypeField(true, 0xFF);
661     msgForBA = BuildRegedTinyMessage();
662     errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
663     EXPECT_EQ(errCode, E_OK);
664     std::this_thread::sleep_for(std::chrono::milliseconds(100));
665     EXPECT_EQ(recvCount, 0);
666     g_envDeviceB.adapterHandle->SimulateSendBitErrorInPacketTypeField(false, 0);
667 
668     /**
669      * @tc.steps: step1. create packet with paddingLen field error
670      * @tc.expected: step1. no message callback
671      */
672     g_envDeviceB.adapterHandle->SimulateSendBitErrorInPaddingLenField(true, 0xFF);
673     msgForBA = BuildRegedTinyMessage();
674     errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
675     EXPECT_EQ(errCode, E_OK);
676     std::this_thread::sleep_for(std::chrono::milliseconds(100));
677     EXPECT_EQ(recvCount, 0);
678     g_envDeviceB.adapterHandle->SimulateSendBitErrorInPaddingLenField(false, 0);
679 
680     // CleanUp
681     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
682 }
683 
684 /**
685  * @tc.name: Send Result Notify 001
686  * @tc.desc: Test send result notify
687  * @tc.type: FUNC
688  * @tc.require:
689  * @tc.author: xiaozhenjian
690  */
691 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendResultNotify001, TestSize.Level1)
692 {
693     // preset
694     std::vector<int> sendResult;
__anonf5bf6e8a0c02(int result, bool isDirectEnd) 695     auto sendResultNotifier = [&sendResult](int result, bool isDirectEnd) {
696         sendResult.push_back(result);
697     };
698 
699     /**
700      * @tc.steps: step1. connect device A with device B
701      */
702     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
703 
704     /**
705      * @tc.steps: step2. device A send message to device B using communicator AA
706      * @tc.expected: step2. notify send done and success
707      */
708     Message *msgForAA = BuildRegedTinyMessage();
709     ASSERT_NE(msgForAA, nullptr);
710     SendConfig conf = {false, false, true, 0};
711     int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf, sendResultNotifier);
712     EXPECT_EQ(errCode, E_OK);
713     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
714     ASSERT_EQ(sendResult.size(), static_cast<size_t>(1)); // 1 notify
715     EXPECT_EQ(sendResult[0], E_OK);
716 
717     /**
718      * @tc.steps: step3. disconnect device A with device B
719      */
720     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
721 
722     /**
723      * @tc.steps: step4. device A send message to device B using communicator AA
724      * @tc.expected: step2. notify send done and fail
725      */
726     msgForAA = BuildRegedTinyMessage();
727     ASSERT_NE(msgForAA, nullptr);
728     errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf, sendResultNotifier);
729     EXPECT_EQ(errCode, E_OK);
730     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
731     ASSERT_EQ(sendResult.size(), static_cast<size_t>(2)); // 2 notify
732     EXPECT_NE(sendResult[1], E_OK); // 1 for second element
733 }
734 
735 /**
736  * @tc.name: Message Feedback 001
737  * @tc.desc: Test feedback not support messageid and communicator not found
738  * @tc.type: FUNC
739  * @tc.require:
740  * @tc.author: xiaozhenjian
741  */
742 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, MessageFeedback001, TestSize.Level1)
743 {
744     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
745     // preset
746     REG_MESSAGE_CALLBACK(A, A);
747     REG_MESSAGE_CALLBACK(B, A);
748     REG_MESSAGE_CALLBACK(B, B);
749 
750     /**
751      * @tc.steps: step1. connect device A with device B
752      */
753     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
754 
755     /**
756      * @tc.steps: step2. device B send message to device A using communicator BB
757      * @tc.expected: step2. communicator BB receive communicator not found feedback
758      */
759     Message *msgForBB = BuildRegedTinyMessage();
760     ASSERT_NE(msgForBB, nullptr);
761     SendConfig conf = {false, false, true, 0};
762     int errCode = g_commBB->SendMessage(DEVICE_NAME_A, msgForBB, conf);
763     EXPECT_EQ(errCode, E_OK);
764     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
765     ASSERT_NE(recvMsgForBB, nullptr);
766     EXPECT_EQ(srcTargetForBB, DEVICE_NAME_A);
767     EXPECT_EQ(recvMsgForBB->GetMessageId(), REGED_TINY_MSG_ID);
768     EXPECT_EQ(recvMsgForBB->GetMessageType(), TYPE_RESPONSE);
769     EXPECT_EQ(recvMsgForBB->GetSessionId(), FIXED_SESSIONID);
770     EXPECT_EQ(recvMsgForBB->GetSequenceId(), FIXED_SEQUENCEID);
771     EXPECT_EQ(recvMsgForBB->GetErrorNo(), static_cast<uint32_t>(E_FEEDBACK_COMMUNICATOR_NOT_FOUND));
772     EXPECT_EQ(recvMsgForBB->GetObject<RegedTinyObject>(), nullptr);
773     delete recvMsgForBB;
774     recvMsgForBB = nullptr;
775 
776     /**
777      * @tc.steps: step3. simulate messageid not registered
778      */
779     g_envDeviceB.adapterHandle->SimulateSendBitErrorInMessageIdField(true, UNREGED_TINY_MSG_ID);
780 
781     /**
782      * @tc.steps: step4. device B send message to device A using communicator BA
783      * @tc.expected: step4. communicator BA receive messageid not register feedback
784      */
785     Message *msgForBA = BuildRegedTinyMessage();
786     ASSERT_NE(msgForBA, nullptr);
787     errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
788     EXPECT_EQ(errCode, E_OK);
789     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
790     ASSERT_NE(recvMsgForBA, nullptr);
791     EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
792     EXPECT_EQ(recvMsgForBA->GetMessageId(), UNREGED_TINY_MSG_ID);
793     EXPECT_EQ(recvMsgForBA->GetMessageType(), TYPE_RESPONSE);
794     EXPECT_EQ(recvMsgForBA->GetSessionId(), FIXED_SESSIONID);
795     EXPECT_EQ(recvMsgForBA->GetSequenceId(), FIXED_SEQUENCEID);
796     EXPECT_EQ(recvMsgForBA->GetErrorNo(), static_cast<uint32_t>(E_FEEDBACK_UNKNOWN_MESSAGE));
797     EXPECT_EQ(recvMsgForBA->GetObject<RegedTinyObject>(), nullptr);
798     delete recvMsgForBA;
799     recvMsgForBA = nullptr;
800 
801     // CleanUp
802     g_envDeviceB.adapterHandle->SimulateSendBitErrorInMessageIdField(false, 0);
803     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
804     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
805 }
806 
807 /**
808  * @tc.name: SendAndReceiveWithExtendHead001
809  * @tc.desc: Test fill extendHead func
810  * @tc.type: FUNC
811  * @tc.require:
812  * @tc.author: zhuwentao
813  */
814 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceiveWithExtendHead001, TestSize.Level1)
815 {
816     // Preset
817     TimeSync::RegisterTransformFunc();
818     REG_MESSAGE_CALLBACK(A, A);
819     REG_MESSAGE_CALLBACK(B, A);
820 
821     /**
822      * @tc.steps: step1. connect device A with device B
823      */
824     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
825 
826     /**
827      * @tc.steps: step2. device A send ApplayerFrameMessage to device B using communicator AA with extednHead
828      * @tc.expected: step2. communicator BA received the message
829      */
830     Message *msgForAA = BuildAppLayerFrameMessage();
831     ASSERT_NE(msgForAA, nullptr);
832     UserInfo userInfo = {"", "userId"};
833     g_envDeviceB.adapterHandle->SetUserInfo({userInfo});
834     SendConfig conf = {false, true, true, 0, {"appId", "storeId", "", "DeviceB"}};
835     int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
836     EXPECT_EQ(errCode, E_OK);
837     std::this_thread::sleep_for(std::chrono::milliseconds(200)); // sleep 200 ms
838     EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
839     ASSERT_NE(recvMsgForBA, nullptr);
840     delete recvMsgForBA;
841     recvMsgForBA = nullptr;
842     DistributedDB::ProtocolProto::UnRegTransformFunction(DistributedDB::TIME_SYNC_MESSAGE);
843     // CleanUp
844     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
845 }
846 
847 /**
848  * @tc.name: GetDataUserInfoTest001
849  * @tc.desc: Test GetDataUserInfo return NEED_CORRECT_TARGET_USER
850  * @tc.type: FUNC
851  * @tc.require:
852  * @tc.author: liaoyonghuang
853  */
854 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, GetDataUserInfoTest001, TestSize.Level1)
855 {
856     // Preset
857     TimeSync::RegisterTransformFunc();
858     REG_MESSAGE_CALLBACK(A, A);
859     REG_MESSAGE_CALLBACK(B, A);
860 
861     /**
862      * @tc.steps: step1. connect device A with device B
863      */
864     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
865 
866     /**
867      * @tc.steps: step2. return NEED_CORRECT_TARGET_USER when get data user info
868      */
869     std::shared_ptr<ProcessCommunicatorTestStub> processCommunicator = std::make_shared<ProcessCommunicatorTestStub>();
870     processCommunicator->SetGetDataUserInfoRet(NEED_CORRECT_TARGET_USER);
871     g_envDeviceB.adapterHandle->SetProcessCommunicator(processCommunicator);
872     /**
873      * @tc.steps: step3. device A send ApplayerFrameMessage to device B using communicator AA with extednHead
874      * @tc.expected: step3. communicator AA received the message, errNo is E_NEED_CORRECT_TARGET_USER
875      */
876     Message *msgForAA = BuildAppLayerFrameMessage();
877     ASSERT_NE(msgForAA, nullptr);
878     UserInfo userInfo = {"", "userId"};
879     g_envDeviceB.adapterHandle->SetUserInfo({userInfo});
880     SendConfig conf = {false, true, true, 0, {"appId", "storeId", "", "DeviceB"}};
881     int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
882     EXPECT_EQ(errCode, E_OK);
883     std::this_thread::sleep_for(std::chrono::milliseconds(200)); // sleep 200 ms
884     ASSERT_NE(recvMsgForAA, nullptr);
885     EXPECT_EQ(recvMsgForAA->GetErrorNo(), E_NEED_CORRECT_TARGET_USER);
886     delete recvMsgForAA;
887     recvMsgForAA = nullptr;
888     DistributedDB::ProtocolProto::UnRegTransformFunction(DistributedDB::TIME_SYNC_MESSAGE);
889     // CleanUp
890     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
891 }
892 
893 /**
894  * @tc.name: ToSerialBufferTest001
895  * @tc.desc: Test ToSerialBuffer func
896  * @tc.type: FUNC
897  * @tc.require:
898  * @tc.author: tiansimiao
899  */
900 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, ToSerialBufferTest001, TestSize.Level1)
901 {
902     std::shared_ptr<ExtendHeaderHandle> extendHandle;
903     int errorNo = E_OK;
904     SerialBuffer* buffer = ProtocolProto::ToSerialBuffer(nullptr, extendHandle, false, errorNo);
905     EXPECT_EQ(errorNo, -E_INVALID_ARGS);
906     EXPECT_EQ(buffer, nullptr);
907 }
908 
909 /**
910  * @tc.name: ToMessageTest001
911  * @tc.desc: Test ToMessage func
912  * @tc.type: FUNC
913  * @tc.require:
914  * @tc.author: tiansimiao
915  */
916 HWTEST_F(DistributedDBCommunicatorSendReceiveTest, ToMessageTest001, TestSize.Level1)
917 {
918     int errorNo = E_OK;
919     Message* msg = ProtocolProto::ToMessage(nullptr, errorNo, false);
920     EXPECT_EQ(errorNo, -E_INVALID_ARGS);
921     EXPECT_EQ(msg, nullptr);
922 }